diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 93446c2117..a317133301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1386,19 +1386,11 @@ private void addDatanode2ExistingPipeline() throws IOException { * Case 2: Failure in Streaming * - Append/Create: * + transfer RBW - * - * Case 3: Failure in Close - * - Append/Create: - * + no transfer, let NameNode replicates the block. */ if (!isAppend && lastAckedSeqno < 0 && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { //no data have been written return; - } else if (stage == BlockConstructionStage.PIPELINE_CLOSE - || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { - //pipeline is closing - return; } int tried = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 248bc9fcc9..c9eb20158a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -1492,6 +1492,8 @@ public void run() { if (lastPacketInBlock) { // Finalize the block and close the block file finalizeBlock(startTime); + // For test only, no-op in production system. + DataNodeFaultInjector.get().delayAckLastPacket(); } Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index e3b9cf0399..03cd4e97ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -68,6 +68,12 @@ public void delaySendingAckToUpstream(final String upstreamAddr) throws IOException { } + /** + * Used as a hook to delay sending the response of the last packet. + */ + public void delayAckLastPacket() throws IOException { + } + /** * Used as a hook to delay writing a packet to disk. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 873af8b505..3b766f930a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -19,12 +19,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -39,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; @@ -800,4 +803,94 @@ public void testUpdatePipeLineAfterDNReg()throws Exception { } } } + + @Test + public void testAddingDatanodeDuringClosing() throws Exception { + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void delayAckLastPacket() throws IOException { + try { + // Makes the PIPELINE_CLOSE stage longer. + Thread.sleep(5000); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while sleeping"); + } + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(dnFaultInjector); + + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testAddingDatanodeDuringClosing"); + FSDataOutputStream out = fileSys.create(file); + byte[] buffer = new byte[128 * 1024]; + out.write(buffer); + // Wait for the pipeline to be built successfully. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + if (((DFSOutputStream) out.getWrappedStream()).getStreamer() + .getNodes() != null) { + return true; + } + return false; + } + }, 100, 3000); + + // Get three datanodes on the pipeline. + DatanodeInfo[] pipeline = + ((DFSOutputStream) out.getWrappedStream()).getStreamer().getNodes(); + DataNode[] dataNodes = new DataNode[3]; + int i = 0; + for (DatanodeInfo info : pipeline) { + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) { + dataNodes[i++] = dn; + break; + } + } + } + + // Shutdown the first datanode. According to the default replacement + // strategy, no datanode will be added to existing pipeline. + dataNodes[0].shutdown(); + + // Shutdown the second datanode when the pipeline is closing. + new Thread(() -> { + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + if (((DFSOutputStream) out.getWrappedStream()).getStreamer() + .getStage() == BlockConstructionStage.PIPELINE_CLOSE) { + return true; + } + return false; + } + }, 100, 10000); + } catch (TimeoutException | InterruptedException e) { + e.printStackTrace(); + } + dataNodes[1].shutdown(); + }).start(); + out.close(); + // Shutdown the third datanode. + dataNodes[2].shutdown(); + // Check if we can read the file successfully. + DFSTestUtil.readFile(fileSys, file); + } catch (BlockMissingException e) { + fail("The file can not be read! " + e); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.set(oldDnInjector); + } + } }