diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 18505ed513..5b40356bdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -367,6 +367,8 @@ Release 2.5.0 - UNRELEASED HDFS-6153. Document "fileId" and "childrenNum" fields in the FileStatus Json schema (Akira Ajisaka via vinayakumarb) + HDFS-6178. Decommission on standby NN couldn't finish. (Ming Ma via jing9) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index cdaf97ab24..0bcc82fb3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2999,10 +2999,14 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode, /** * On stopping decommission, check if the node has excess replicas. - * If there are any excess replicas, call processOverReplicatedBlock() + * If there are any excess replicas, call processOverReplicatedBlock(). + * Process over replicated blocks only when active NN is out of safe mode. */ void processOverReplicatedBlocksOnReCommission( final DatanodeDescriptor srcNode) { + if (!namesystem.isPopulatingReplQueues()) { + return; + } final Iterator it = srcNode.getBlockIterator(); int numOverReplicated = 0; while(it.hasNext()) { @@ -3068,11 +3072,13 @@ boolean isReplicationInProgress(DatanodeDescriptor srcNode) { } } if (!neededReplications.contains(block) && - pendingReplications.getNumReplicas(block) == 0) { + pendingReplications.getNumReplicas(block) == 0 && + namesystem.isPopulatingReplQueues()) { // // These blocks have been reported from the datanode // after the startDecommission method has been executed. These // blocks were in flight when the decommissioning was started. + // Process these blocks only when active NN is out of safe mode. // neededReplications.add(block, curReplicas, @@ -3344,8 +3350,11 @@ private class ReplicationMonitor implements Runnable { public void run() { while (namesystem.isRunning()) { try { - computeDatanodeWork(); - processPendingReplications(); + // Process replication work only when active NN is out of safe mode. + if (namesystem.isPopulatingReplQueues()) { + computeDatanodeWork(); + processPendingReplications(); + } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) { if (!namesystem.isRunning()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index afba5cad42..b9eab7f086 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -42,7 +42,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.test.PathUtils; @@ -202,10 +204,11 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException { } /* - * decommission one random node and wait for each to reach the - * given {@code waitForState}. + * decommission the DN at index dnIndex or one random node if dnIndex is set + * to -1 and wait for the node to reach the given {@code waitForState}. */ private DatanodeInfo decommissionNode(int nnIndex, + String datanodeUuid, ArrayListdecommissionedNodes, AdminStates waitForState) throws IOException { @@ -213,14 +216,26 @@ private DatanodeInfo decommissionNode(int nnIndex, DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); // - // pick one datanode randomly. + // pick one datanode randomly unless the caller specifies one. // int index = 0; - boolean found = false; - while (!found) { - index = myrand.nextInt(info.length); - if (!info[index].isDecommissioned()) { - found = true; + if (datanodeUuid == null) { + boolean found = false; + while (!found) { + index = myrand.nextInt(info.length); + if (!info[index].isDecommissioned()) { + found = true; + } + } + } else { + // The caller specifies a DN + for (; index < info.length; index++) { + if (info[index].getDatanodeUuid().equals(datanodeUuid)) { + break; + } + } + if (index == info.length) { + throw new IOException("invalid datanodeUuid " + datanodeUuid); } } String nodename = info[index].getXferAddr(); @@ -242,11 +257,13 @@ private DatanodeInfo decommissionNode(int nnIndex, return ret; } - /* stop decommission of the datanode and wait for each to reach the NORMAL state */ - private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException { + /* Ask a specific NN to stop decommission of the datanode and wait for each + * to reach the NORMAL state. + */ + private void recomissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException { LOG.info("Recommissioning node: " + decommissionedNode); writeConfigFile(excludeFile, null); - refreshNodes(cluster.getNamesystem(), conf); + refreshNodes(cluster.getNamesystem(nnIndex), conf); waitNodeState(decommissionedNode, AdminStates.NORMAL); } @@ -367,7 +384,7 @@ public void testDecommission2() throws IOException { int liveDecomissioned = ns.getNumDecomLiveDataNodes(); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(0, decommissionedNodes, + DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); @@ -403,7 +420,130 @@ public void testRecommission() throws IOException { public void testDecommissionFederation() throws IOException { testDecommission(2, 2); } - + + /** + * Test decommission process on standby NN. + * Verify admins can run "dfsadmin -refreshNodes" on SBN and decomm + * process can finish as long as admins run "dfsadmin -refreshNodes" + * on active NN. + * SBN used to mark excess replica upon recommission. The SBN's pick + * for excess replica could be different from the one picked by ANN. + * That creates inconsistent state and prevent SBN from finishing + * decommission. + */ + @Test(timeout=360000) + public void testDecommissionOnStandby() throws Exception { + Configuration hdfsConf = new HdfsConfiguration(conf); + hdfsConf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30000); + hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2); + + // The time to wait so that the slow DN's heartbeat is considered old + // by BlockPlacementPolicyDefault and thus will choose that DN for + // excess replica. + long slowHeartbeatDNwaitTime = + hdfsConf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (hdfsConf.getInt( + DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, + DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1); + + cluster = new MiniDFSCluster.Builder(hdfsConf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build(); + + cluster.transitionToActive(0); + cluster.waitActive(); + + + // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs. + // The last DN is empty. Also configure the last DN to have slow heartbeat + // so that it will be chosen as excess replica candidate during recommission. + + // Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the + // same as # of DNs, each DN will have a replica for any block. + Path file1 = new Path("testDecommissionHA.dat"); + int replicas = 3; + FileSystem activeFileSys = cluster.getFileSystem(0); + writeFile(activeFileSys, file1, replicas); + + HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), + cluster.getNameNode(1)); + + // Step 1.b, start a DN with slow heartbeat, so that we can know for sure it + // will be chosen as the target of excess replica during recommission. + hdfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); + cluster.startDataNodes(hdfsConf, 1, true, null, null, null); + DataNode lastDN = cluster.getDataNodes().get(3); + lastDN.getDatanodeUuid(); + + // Step 2, decommission the first DN at both ANN and SBN. + DataNode firstDN = cluster.getDataNodes().get(0); + + // Step 2.a, ask ANN to decomm the first DN + DatanodeInfo decommissionedNodeFromANN = decommissionNode( + 0, firstDN.getDatanodeUuid(), null, AdminStates.DECOMMISSIONED); + + // Step 2.b, ask SBN to decomm the first DN + DatanodeInfo decomNodeFromSBN = decommissionNode(1, firstDN.getDatanodeUuid(), null, + AdminStates.DECOMMISSIONED); + + // Step 3, recommission the first DN on SBN and ANN to create excess replica + // It recommissions the node on SBN first to create potential + // inconsistent state. In production cluster, such insistent state can happen + // even if recommission command was issued on ANN first given the async nature + // of the system. + + // Step 3.a, ask SBN to recomm the first DN. + // SBN has been fixed so that it no longer invalidates excess replica during + // recommission. + // Before the fix, SBN could get into the following state. + // 1. the last DN would have been chosen as excess replica, given its + // heartbeat is considered old. + // Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete + // 2. After recomissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 ) + // and one excess replica ( 3 ) + // After the fix, + // After recomissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 ) + Thread.sleep(slowHeartbeatDNwaitTime); + recomissionNode(1, decomNodeFromSBN); + + // Step 3.b, ask ANN to recommission the first DN. + // To verify the fix, the test makes sure the excess replica picked by ANN + // is different from the one picked by SBN before the fix. + // To achieve that, we make sure next-to-last DN is chosen as excess replica + // by ANN. + // 1. restore LastDNprop's heartbeat interval. + // 2. Make next-to-last DN's heartbeat slow. + MiniDFSCluster.DataNodeProperties LastDNprop = cluster.stopDataNode(3); + LastDNprop.conf.setLong( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); + cluster.restartDataNode(LastDNprop); + + MiniDFSCluster.DataNodeProperties nextToLastDNprop = cluster.stopDataNode(2); + nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); + cluster.restartDataNode(nextToLastDNprop); + cluster.waitActive(); + Thread.sleep(slowHeartbeatDNwaitTime); + recomissionNode(0, decommissionedNodeFromANN); + + // Step 3.c, make sure the DN has deleted the block and report to NNs + cluster.triggerHeartbeats(); + HATestUtil.waitForDNDeletions(cluster); + cluster.triggerDeletionReports(); + + // Step 4, decommission the first DN on both ANN and SBN + // With the fix to make sure SBN no longer marks excess replica + // during recommission, SBN's decommission can finish properly + decommissionNode(0, firstDN.getDatanodeUuid(), null, + AdminStates.DECOMMISSIONED); + + // Ask SBN to decomm the first DN + decommissionNode(1, firstDN.getDatanodeUuid(), null, + AdminStates.DECOMMISSIONED); + + cluster.shutdown(); + + } + private void testDecommission(int numNamenodes, int numDatanodes) throws IOException { LOG.info("Starting test testDecommission"); @@ -430,7 +570,7 @@ private void testDecommission(int numNamenodes, int numDatanodes) int liveDecomissioned = ns.getNumDecomLiveDataNodes(); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes, + DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); @@ -458,7 +598,7 @@ private void testDecommission(int numNamenodes, int numDatanodes) } } - // Restart the cluster and ensure recommissioned datanodes + // Restart the cluster and ensure decommissioned datanodes // are allowed to register with the namenode cluster.shutdown(); startCluster(numNamenodes, numDatanodes, conf); @@ -486,7 +626,7 @@ private void testRecommission(int numNamenodes, int numDatanodes) writeFile(fileSys, file1, replicas); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes, + DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); @@ -510,7 +650,7 @@ private void testRecommission(int numNamenodes, int numDatanodes) + tries + " times.", tries < 20); // stop decommission and check if the new replicas are removed - recomissionNode(decomNode); + recomissionNode(0, decomNode); // wait for the block to be deleted tries = 0; while (tries++ < 20) { @@ -561,7 +701,7 @@ public void testClusterStats(int numNameNodes) throws IOException, FSNamesystem fsn = cluster.getNamesystem(i); NameNode namenode = cluster.getNameNode(i); - DatanodeInfo downnode = decommissionNode(i, null, + DatanodeInfo downnode = decommissionNode(i, null, null, AdminStates.DECOMMISSION_INPROGRESS); // Check namenode stats for multiple datanode heartbeats verifyStats(namenode, fsn, downnode, true);