diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index f5ce0ff2d6..838da7efe9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1620,7 +1620,8 @@ void updateBlockGS(final long newGS) { } /** update pipeline at the namenode */ - private void updatePipeline(long newGS) throws IOException { + @VisibleForTesting + public void updatePipeline(long newGS) throws IOException { final ExtendedBlock oldBlock = block.getCurrentBlock(); // the new GS has been propagated to all DN, it should be ok to update the // local block state diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 0212c4e911..3f8c7f7796 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; @@ -706,4 +709,48 @@ public void failPipeline(ReplicaInPipeline replicaInfo, cluster.shutdown(); } } + + @Test + public void testUpdatePipeLineAfterDNReg()throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testUpdatePipeLineAfterDNReg"); + FSDataOutputStream out = fileSys.create(file); + out.write(1); + out.hflush(); + //Get the First DN and disable the heartbeats and then put in Deadstate + DataNode dn1 = cluster.getDataNodes().get(0); + dn1.setHeartbeatsDisabledForTests(true); + DatanodeDescriptor dn1Desc = cluster.getNamesystem(0).getBlockManager() + .getDatanodeManager().getDatanode(dn1.getDatanodeId()); + cluster.setDataNodeDead(dn1Desc); + //Re-register the DeadNode + DatanodeProtocolClientSideTranslatorPB dnp = new DatanodeProtocolClientSideTranslatorPB( + cluster.getNameNode().getNameNodeAddress(), conf); + dnp.registerDatanode( + dn1.getDNRegistrationForBP(cluster.getNamesystem().getBlockPoolId())); + DFSOutputStream dfsO = (DFSOutputStream) out.getWrappedStream(); + String clientName = ((DistributedFileSystem) fileSys).getClient() + .getClientName(); + NamenodeProtocols namenode = cluster.getNameNodeRpc(); + //Update the genstamp and call updatepipeline + LocatedBlock newBlock = namenode + .updateBlockForPipeline(dfsO.getBlock(), clientName); + dfsO.getStreamer() + .updatePipeline(newBlock.getBlock().getGenerationStamp()); + newBlock = namenode.updateBlockForPipeline(dfsO.getBlock(), clientName); + //Should not throw any error Pipeline should be success + dfsO.getStreamer() + .updatePipeline(newBlock.getBlock().getGenerationStamp()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }