HDFS-7996. After swapping a volume, BlockReceiver reports ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)
This commit is contained in:
parent
932730df7d
commit
023133cef9
@ -1373,6 +1373,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-8039. Fix TestDebugAdmin#testRecoverLease and
|
||||
testVerifyBlockChecksumCommand on Windows. (Xiaoyu Yao via cnauroth)
|
||||
|
||||
HDFS-7996. After swapping a volume, BlockReceiver reports
|
||||
ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)
|
||||
|
||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||
|
@ -281,7 +281,7 @@ String getStorageUuid() {
|
||||
}
|
||||
|
||||
/**
|
||||
* close files.
|
||||
* close files and release volume reference.
|
||||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
@ -798,17 +798,20 @@ void receiveBlock(
|
||||
// then finalize block or convert temporary to RBW.
|
||||
// For client-writes, the block is finalized in the PacketResponder.
|
||||
if (isDatanode || isTransfer) {
|
||||
// close the block/crc files
|
||||
close();
|
||||
block.setNumBytes(replicaInfo.getNumBytes());
|
||||
// Hold a volume reference to finalize block.
|
||||
try (ReplicaHandler handler = claimReplicaHandler()) {
|
||||
// close the block/crc files
|
||||
close();
|
||||
block.setNumBytes(replicaInfo.getNumBytes());
|
||||
|
||||
if (stage == BlockConstructionStage.TRANSFER_RBW) {
|
||||
// for TRANSFER_RBW, convert temporary to RBW
|
||||
datanode.data.convertTemporaryToRbw(block);
|
||||
} else {
|
||||
// for isDatnode or TRANSFER_FINALIZED
|
||||
// Finalize the block.
|
||||
datanode.data.finalizeBlock(block);
|
||||
if (stage == BlockConstructionStage.TRANSFER_RBW) {
|
||||
// for TRANSFER_RBW, convert temporary to RBW
|
||||
datanode.data.convertTemporaryToRbw(block);
|
||||
} else {
|
||||
// for isDatnode or TRANSFER_FINALIZED
|
||||
// Finalize the block.
|
||||
datanode.data.finalizeBlock(block);
|
||||
}
|
||||
}
|
||||
datanode.metrics.incrBlocksWritten();
|
||||
}
|
||||
@ -981,6 +984,13 @@ private Checksum computePartialChunkCrc(long blkoff, long ckoff)
|
||||
return partialCrc;
|
||||
}
|
||||
|
||||
/** The caller claims the ownership of the replica handler. */
|
||||
private ReplicaHandler claimReplicaHandler() {
|
||||
ReplicaHandler handler = replicaHandler;
|
||||
replicaHandler = null;
|
||||
return handler;
|
||||
}
|
||||
|
||||
private static enum PacketResponderType {
|
||||
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
|
||||
}
|
||||
@ -1280,11 +1290,14 @@ public void run() {
|
||||
* @param startTime time when BlockReceiver started receiving the block
|
||||
*/
|
||||
private void finalizeBlock(long startTime) throws IOException {
|
||||
BlockReceiver.this.close();
|
||||
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
|
||||
: 0;
|
||||
block.setNumBytes(replicaInfo.getNumBytes());
|
||||
datanode.data.finalizeBlock(block);
|
||||
long endTime = 0;
|
||||
// Hold a volume reference to finalize block.
|
||||
try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler()) {
|
||||
BlockReceiver.this.close();
|
||||
endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
||||
block.setNumBytes(replicaInfo.getNumBytes());
|
||||
datanode.data.finalizeBlock(block);
|
||||
}
|
||||
|
||||
if (pinning) {
|
||||
datanode.data.setPinning(block);
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
@ -64,6 +65,8 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.hamcrest.CoreMatchers.anyOf;
|
||||
@ -77,6 +80,7 @@
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
|
||||
public class TestDataNodeHotSwapVolumes {
|
||||
@ -577,6 +581,7 @@ private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
|
||||
final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
final Path testFile = new Path("/test");
|
||||
final long lastTimeDiskErrorCheck = dn.getLastDiskErrorCheck();
|
||||
|
||||
FSDataOutputStream out = fs.create(testFile, REPLICATION);
|
||||
|
||||
@ -586,6 +591,23 @@ private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
|
||||
out.write(writeBuf);
|
||||
out.hflush();
|
||||
|
||||
// Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
|
||||
// BlockReceiver releases volume reference before finalizeBlock(), the blocks
|
||||
// on the volume will be removed, and finalizeBlock() throws IOE.
|
||||
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
|
||||
dn.data = Mockito.spy(data);
|
||||
doAnswer(new Answer<Object>() {
|
||||
public Object answer(InvocationOnMock invocation)
|
||||
throws IOException, InterruptedException {
|
||||
Thread.sleep(1000);
|
||||
// Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
|
||||
// the block is not removed, since the volume reference should not
|
||||
// be released at this point.
|
||||
data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]);
|
||||
return null;
|
||||
}
|
||||
}).when(dn.data).finalizeBlock(any(ExtendedBlock.class));
|
||||
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
List<String> oldDirs = getDataDirs(dn);
|
||||
@ -612,13 +634,19 @@ public void run() {
|
||||
out.hflush();
|
||||
out.close();
|
||||
|
||||
reconfigThread.join();
|
||||
|
||||
// Verify the file has sufficient replications.
|
||||
DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
|
||||
// Read the content back
|
||||
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
|
||||
assertEquals(BLOCK_SIZE, content.length);
|
||||
|
||||
reconfigThread.join();
|
||||
// If an IOException thrown from BlockReceiver#run, it triggers
|
||||
// DataNode#checkDiskError(). So we can test whether checkDiskError() is called,
|
||||
// to see whether there is IOException in BlockReceiver#run().
|
||||
assertEquals(lastTimeDiskErrorCheck, dn.getLastDiskErrorCheck());
|
||||
|
||||
if (!exceptions.isEmpty()) {
|
||||
throw new IOException(exceptions.get(0).getCause());
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user