HDFS-15240. Erasure Coding: dirty buffer causes reconstruction block error. Contributed by HuangTao.
This commit is contained in:
parent
8c234fc0d4
commit
e2c1268ebd
@ -96,6 +96,7 @@ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
|
||||
ByteBuffer.allocate(length);
|
||||
}
|
||||
tree.remove(entry.getKey());
|
||||
entry.getValue().clear();
|
||||
return entry.getValue();
|
||||
}
|
||||
|
||||
|
@ -96,6 +96,22 @@ public void throwTooManyOpenFiles() throws FileNotFoundException {
|
||||
*/
|
||||
public void stripedBlockReconstruction() throws IOException {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject latency when read block
|
||||
* in erasure coding reconstruction process.
|
||||
*/
|
||||
public void delayBlockReader() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject intercept when free the block reader buffer.
|
||||
*/
|
||||
public void interceptFreeBlockReaderBuffer() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject intercept When finish reading from block.
|
||||
*/
|
||||
public void interceptBlockReader() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject intercept when BPOfferService hold lock.
|
||||
*/
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
@ -95,6 +96,7 @@ ByteBuffer getReadBuffer() {
|
||||
}
|
||||
|
||||
void freeReadBuffer() {
|
||||
DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer();
|
||||
buffer = null;
|
||||
}
|
||||
|
||||
@ -179,6 +181,8 @@ public BlockReadStats call() throws Exception {
|
||||
} catch (IOException e) {
|
||||
LOG.info(e.getMessage());
|
||||
throw e;
|
||||
} finally {
|
||||
DataNodeFaultInjector.get().interceptBlockReader();
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -188,6 +192,7 @@ public BlockReadStats call() throws Exception {
|
||||
* Perform actual reading of bytes from block.
|
||||
*/
|
||||
private BlockReadStats actualReadFromBlock() throws IOException {
|
||||
DataNodeFaultInjector.get().delayBlockReader();
|
||||
int len = buffer.remaining();
|
||||
int n = 0;
|
||||
while (n < len) {
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -328,14 +329,14 @@ int[] doReadMinimumSources(int reconstructLength,
|
||||
// cancel remaining reads if we read successfully from minimum
|
||||
// number of source DNs required by reconstruction.
|
||||
cancelReads(futures.keySet());
|
||||
futures.clear();
|
||||
clearFuturesAndService();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Read data interrupted.", e);
|
||||
cancelReads(futures.keySet());
|
||||
futures.clear();
|
||||
clearFuturesAndService();
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -429,6 +430,20 @@ private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
|
||||
}
|
||||
}
|
||||
|
||||
// remove all stale futures from readService, and clear futures.
|
||||
private void clearFuturesAndService() {
|
||||
while (!futures.isEmpty()) {
|
||||
try {
|
||||
Future<BlockReadStats> future = readService.poll(
|
||||
stripedReadTimeoutInMills, TimeUnit.MILLISECONDS
|
||||
);
|
||||
futures.remove(future);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Clear stale futures from service is interrupted.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void close() {
|
||||
if (zeroStripeBuffers != null) {
|
||||
for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
|
||||
@ -438,9 +453,9 @@ void close() {
|
||||
zeroStripeBuffers = null;
|
||||
|
||||
for (StripedBlockReader reader : readers) {
|
||||
reader.closeBlockReader();
|
||||
reconstructor.freeBuffer(reader.getReadBuffer());
|
||||
reader.freeReadBuffer();
|
||||
reader.closeBlockReader();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
@ -279,4 +280,9 @@ DataNode getDatanode() {
|
||||
public ErasureCodingWorker getErasureCodingWorker() {
|
||||
return erasureCodingWorker;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static ByteBufferPool getBufferPool() {
|
||||
return BUFFER_POOL;
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.BitSet;
|
||||
@ -34,6 +35,12 @@
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
|
||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -574,4 +581,237 @@ public void stripedBlockReconstruction() throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When the StripedBlockReader timeout, the outdated future should be ignored.
|
||||
* Or the NPE will be thrown, which will stop reading the remaining data, and
|
||||
* the reconstruction task will fail.
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutReadBlockInReconstruction() throws Exception {
|
||||
assumeTrue("Ignore case where num parity units <= 1",
|
||||
ecPolicy.getNumParityUnits() > 1);
|
||||
int stripedBufferSize = conf.getInt(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
|
||||
cellSize);
|
||||
ErasureCodingPolicy policy = ecPolicy;
|
||||
fs.enableErasureCodingPolicy(policy.getName());
|
||||
fs.getClient().setErasureCodingPolicy("/", policy.getName());
|
||||
|
||||
// StripedBlockReconstructor#reconstruct will loop 2 times
|
||||
final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
|
||||
String fileName = "/timeout-read-block";
|
||||
Path file = new Path(fileName);
|
||||
writeFile(fs, fileName, fileLen);
|
||||
fs.getFileBlockLocations(file, 0, fileLen);
|
||||
|
||||
LocatedBlocks locatedBlocks =
|
||||
StripedFileTestUtil.getLocatedBlocks(file, fs);
|
||||
Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
|
||||
// The file only has one block group
|
||||
LocatedBlock lblock = locatedBlocks.get(0);
|
||||
DatanodeInfo[] datanodeinfos = lblock.getLocations();
|
||||
|
||||
// to reconstruct first block
|
||||
DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
|
||||
|
||||
int stripedReadTimeoutInMills = conf.getInt(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
|
||||
DFSConfigKeys.
|
||||
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
|
||||
Assert.assertTrue(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
|
||||
+ " must be greater than 2000",
|
||||
stripedReadTimeoutInMills > 2000);
|
||||
|
||||
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
||||
DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
|
||||
private AtomicInteger numDelayReader = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void delayBlockReader() {
|
||||
int index = numDelayReader.incrementAndGet();
|
||||
LOG.info("Delay the {}th read block", index);
|
||||
|
||||
// the file's first StripedBlockReconstructor#reconstruct,
|
||||
// and the first reader will timeout
|
||||
if (index == 1) {
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> numDelayReader.get() >=
|
||||
ecPolicy.getNumDataUnits() + 1, 50,
|
||||
stripedReadTimeoutInMills * 3
|
||||
);
|
||||
} catch (TimeoutException e) {
|
||||
Assert.fail("Can't reconstruct the file's first part.");
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
// stop all the following re-reconstruction tasks
|
||||
if (index > 3 * ecPolicy.getNumDataUnits() + 1) {
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
DataNodeFaultInjector.set(timeoutInjector);
|
||||
|
||||
try {
|
||||
shutdownDataNode(dataNode);
|
||||
// before HDFS-15240, NPE will cause reconstruction fail(test timeout)
|
||||
StripedFileTestUtil
|
||||
.waitForReconstructionFinished(file, fs, groupSize);
|
||||
} finally {
|
||||
DataNodeFaultInjector.set(oldInjector);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When block reader timeout, the outdated future should be ignored.
|
||||
* Or the ByteBuffer would be wrote after giving back to the BufferPool.
|
||||
* This UT is used to ensure that we should close block reader
|
||||
* before freeing the buffer.
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
|
||||
assumeTrue("Ignore case where num parity units <= 1",
|
||||
ecPolicy.getNumParityUnits() > 1);
|
||||
int stripedBufferSize = conf.getInt(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
|
||||
cellSize);
|
||||
// StripedBlockReconstructor#reconstruct will loop 2 times
|
||||
final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
|
||||
String fileName = "/no-dirty-buffer";
|
||||
Path file = new Path(fileName);
|
||||
writeFile(fs, fileName, fileLen);
|
||||
fs.getFileBlockLocations(file, 0, fileLen);
|
||||
|
||||
LocatedBlocks locatedBlocks =
|
||||
StripedFileTestUtil.getLocatedBlocks(file, fs);
|
||||
Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
|
||||
// The file only has one block group
|
||||
LocatedBlock lblock = locatedBlocks.get(0);
|
||||
DatanodeInfo[] datanodeinfos = lblock.getLocations();
|
||||
|
||||
// to reconstruct first block
|
||||
DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
|
||||
|
||||
int stripedReadTimeoutInMills = conf.getInt(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
|
||||
DFSConfigKeys.
|
||||
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
|
||||
Assert.assertTrue(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
|
||||
+ " must be greater than 2000",
|
||||
stripedReadTimeoutInMills > 2000);
|
||||
|
||||
ElasticByteBufferPool bufferPool =
|
||||
(ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
|
||||
emptyBufferPool(bufferPool, true);
|
||||
emptyBufferPool(bufferPool, false);
|
||||
|
||||
AtomicInteger finishedReadBlock = new AtomicInteger(0);
|
||||
|
||||
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
||||
DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
|
||||
private AtomicInteger numDelayReader = new AtomicInteger(0);
|
||||
private AtomicBoolean continueRead = new AtomicBoolean(false);
|
||||
private AtomicBoolean closeByNPE = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void delayBlockReader() {
|
||||
int index = numDelayReader.incrementAndGet();
|
||||
LOG.info("Delay the {}th read block", index);
|
||||
|
||||
// the file's first StripedBlockReconstructor#reconstruct,
|
||||
// and the first reader will timeout
|
||||
if (index == 1) {
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> numDelayReader.get() >=
|
||||
ecPolicy.getNumDataUnits() + 1, 50,
|
||||
stripedReadTimeoutInMills * 3
|
||||
);
|
||||
} catch (TimeoutException e) {
|
||||
Assert.fail("Can't reconstruct the file's first part.");
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
if (index > ecPolicy.getNumDataUnits() + 1) {
|
||||
try {
|
||||
GenericTestUtils.waitFor(
|
||||
() -> {
|
||||
LOG.info("Close by NPE: {}, continue read: {}",
|
||||
closeByNPE, continueRead);
|
||||
return closeByNPE.get() ? continueRead.get()
|
||||
: index == finishedReadBlock.get() + 1; }, 5,
|
||||
stripedReadTimeoutInMills * 3
|
||||
);
|
||||
} catch (TimeoutException e) {
|
||||
Assert.fail("Can't reconstruct the file's remaining part.");
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void interceptBlockReader() {
|
||||
int n = finishedReadBlock.incrementAndGet();
|
||||
LOG.info("Intercept the end of {}th read block.", n);
|
||||
}
|
||||
|
||||
private AtomicInteger numFreeBuffer = new AtomicInteger(0);
|
||||
@Override
|
||||
public void interceptFreeBlockReaderBuffer() {
|
||||
closeByNPE.compareAndSet(false, true);
|
||||
int num = numFreeBuffer.incrementAndGet();
|
||||
LOG.info("Intercept the {} free block buffer.", num);
|
||||
if (num >= ecPolicy.getNumDataUnits() + 1) {
|
||||
continueRead.compareAndSet(false, true);
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
|
||||
2 * ecPolicy.getNumDataUnits() + 1, 50,
|
||||
stripedReadTimeoutInMills * 3
|
||||
);
|
||||
} catch (TimeoutException e) {
|
||||
Assert.fail("Can't finish the file's reconstruction.");
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
DataNodeFaultInjector.set(timeoutInjector);
|
||||
try {
|
||||
shutdownDataNode(dataNode);
|
||||
// at least one timeout reader
|
||||
GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
|
||||
2 * ecPolicy.getNumDataUnits() + 1, 50,
|
||||
stripedReadTimeoutInMills * 3
|
||||
);
|
||||
|
||||
assertBufferPoolIsEmpty(bufferPool, false);
|
||||
assertBufferPoolIsEmpty(bufferPool, true);
|
||||
StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
|
||||
} finally {
|
||||
DataNodeFaultInjector.set(oldInjector);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
|
||||
boolean direct) {
|
||||
while (bufferPool.size(direct) != 0) {
|
||||
// iterate all ByteBuffers in ElasticByteBufferPool
|
||||
ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
|
||||
Assert.assertEquals(0, byteBuffer.position());
|
||||
}
|
||||
}
|
||||
|
||||
private void emptyBufferPool(ElasticByteBufferPool bufferPool,
|
||||
boolean direct) {
|
||||
while (bufferPool.size(direct) != 0) {
|
||||
bufferPool.getBuffer(direct, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,30 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
|
||||
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
|
||||
public final class ErasureCodingTestHelper {
|
||||
|
||||
private ErasureCodingTestHelper() { }
|
||||
|
||||
public static ByteBufferPool getBufferPool() {
|
||||
return StripedReconstructor.getBufferPool();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user