HDFS-6178. Decommission on standby NN couldn't finish. Contributed by Ming Ma.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589002 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-04-21 23:35:09 +00:00
parent 84388525a3
commit 1be4ddef9e
3 changed files with 173 additions and 22 deletions

View File

@ -367,6 +367,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6153. Document "fileId" and "childrenNum" fields in the FileStatus Json schema HDFS-6153. Document "fileId" and "childrenNum" fields in the FileStatus Json schema
(Akira Ajisaka via vinayakumarb) (Akira Ajisaka via vinayakumarb)
HDFS-6178. Decommission on standby NN couldn't finish. (Ming Ma via jing9)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -2999,10 +2999,14 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
/** /**
* On stopping decommission, check if the node has excess replicas. * On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock() * If there are any excess replicas, call processOverReplicatedBlock().
* Process over replicated blocks only when active NN is out of safe mode.
*/ */
void processOverReplicatedBlocksOnReCommission( void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) { final DatanodeDescriptor srcNode) {
if (!namesystem.isPopulatingReplQueues()) {
return;
}
final Iterator<? extends Block> it = srcNode.getBlockIterator(); final Iterator<? extends Block> it = srcNode.getBlockIterator();
int numOverReplicated = 0; int numOverReplicated = 0;
while(it.hasNext()) { while(it.hasNext()) {
@ -3068,11 +3072,13 @@ boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
} }
} }
if (!neededReplications.contains(block) && if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0) { pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) {
// //
// These blocks have been reported from the datanode // These blocks have been reported from the datanode
// after the startDecommission method has been executed. These // after the startDecommission method has been executed. These
// blocks were in flight when the decommissioning was started. // blocks were in flight when the decommissioning was started.
// Process these blocks only when active NN is out of safe mode.
// //
neededReplications.add(block, neededReplications.add(block,
curReplicas, curReplicas,
@ -3344,8 +3350,11 @@ private class ReplicationMonitor implements Runnable {
public void run() { public void run() {
while (namesystem.isRunning()) { while (namesystem.isRunning()) {
try { try {
computeDatanodeWork(); // Process replication work only when active NN is out of safe mode.
processPendingReplications(); if (namesystem.isPopulatingReplQueues()) {
computeDatanodeWork();
processPendingReplications();
}
Thread.sleep(replicationRecheckInterval); Thread.sleep(replicationRecheckInterval);
} catch (Throwable t) { } catch (Throwable t) {
if (!namesystem.isRunning()) { if (!namesystem.isRunning()) {

View File

@ -42,7 +42,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
@ -202,10 +204,11 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
} }
/* /*
* decommission one random node and wait for each to reach the * decommission the DN at index dnIndex or one random node if dnIndex is set
* given {@code waitForState}. * to -1 and wait for the node to reach the given {@code waitForState}.
*/ */
private DatanodeInfo decommissionNode(int nnIndex, private DatanodeInfo decommissionNode(int nnIndex,
String datanodeUuid,
ArrayList<DatanodeInfo>decommissionedNodes, ArrayList<DatanodeInfo>decommissionedNodes,
AdminStates waitForState) AdminStates waitForState)
throws IOException { throws IOException {
@ -213,14 +216,26 @@ private DatanodeInfo decommissionNode(int nnIndex,
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
// //
// pick one datanode randomly. // pick one datanode randomly unless the caller specifies one.
// //
int index = 0; int index = 0;
boolean found = false; if (datanodeUuid == null) {
while (!found) { boolean found = false;
index = myrand.nextInt(info.length); while (!found) {
if (!info[index].isDecommissioned()) { index = myrand.nextInt(info.length);
found = true; if (!info[index].isDecommissioned()) {
found = true;
}
}
} else {
// The caller specifies a DN
for (; index < info.length; index++) {
if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
break;
}
}
if (index == info.length) {
throw new IOException("invalid datanodeUuid " + datanodeUuid);
} }
} }
String nodename = info[index].getXferAddr(); String nodename = info[index].getXferAddr();
@ -242,11 +257,13 @@ private DatanodeInfo decommissionNode(int nnIndex,
return ret; return ret;
} }
/* stop decommission of the datanode and wait for each to reach the NORMAL state */ /* Ask a specific NN to stop decommission of the datanode and wait for each
private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException { * to reach the NORMAL state.
*/
private void recomissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
LOG.info("Recommissioning node: " + decommissionedNode); LOG.info("Recommissioning node: " + decommissionedNode);
writeConfigFile(excludeFile, null); writeConfigFile(excludeFile, null);
refreshNodes(cluster.getNamesystem(), conf); refreshNodes(cluster.getNamesystem(nnIndex), conf);
waitNodeState(decommissionedNode, AdminStates.NORMAL); waitNodeState(decommissionedNode, AdminStates.NORMAL);
} }
@ -367,7 +384,7 @@ public void testDecommission2() throws IOException {
int liveDecomissioned = ns.getNumDecomLiveDataNodes(); int liveDecomissioned = ns.getNumDecomLiveDataNodes();
// Decommission one node. Verify that node is decommissioned. // Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = decommissionNode(0, decommissionedNodes, DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes,
AdminStates.DECOMMISSIONED); AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode); decommissionedNodes.add(decomNode);
assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
@ -404,6 +421,129 @@ public void testDecommissionFederation() throws IOException {
testDecommission(2, 2); testDecommission(2, 2);
} }
/**
* Test decommission process on standby NN.
* Verify admins can run "dfsadmin -refreshNodes" on SBN and decomm
* process can finish as long as admins run "dfsadmin -refreshNodes"
* on active NN.
* SBN used to mark excess replica upon recommission. The SBN's pick
* for excess replica could be different from the one picked by ANN.
* That creates inconsistent state and prevent SBN from finishing
* decommission.
*/
@Test(timeout=360000)
public void testDecommissionOnStandby() throws Exception {
Configuration hdfsConf = new HdfsConfiguration(conf);
hdfsConf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30000);
hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2);
// The time to wait so that the slow DN's heartbeat is considered old
// by BlockPlacementPolicyDefault and thus will choose that DN for
// excess replica.
long slowHeartbeatDNwaitTime =
hdfsConf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (hdfsConf.getInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1);
cluster = new MiniDFSCluster.Builder(hdfsConf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
cluster.transitionToActive(0);
cluster.waitActive();
// Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs.
// The last DN is empty. Also configure the last DN to have slow heartbeat
// so that it will be chosen as excess replica candidate during recommission.
// Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the
// same as # of DNs, each DN will have a replica for any block.
Path file1 = new Path("testDecommissionHA.dat");
int replicas = 3;
FileSystem activeFileSys = cluster.getFileSystem(0);
writeFile(activeFileSys, file1, replicas);
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
// Step 1.b, start a DN with slow heartbeat, so that we can know for sure it
// will be chosen as the target of excess replica during recommission.
hdfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
cluster.startDataNodes(hdfsConf, 1, true, null, null, null);
DataNode lastDN = cluster.getDataNodes().get(3);
lastDN.getDatanodeUuid();
// Step 2, decommission the first DN at both ANN and SBN.
DataNode firstDN = cluster.getDataNodes().get(0);
// Step 2.a, ask ANN to decomm the first DN
DatanodeInfo decommissionedNodeFromANN = decommissionNode(
0, firstDN.getDatanodeUuid(), null, AdminStates.DECOMMISSIONED);
// Step 2.b, ask SBN to decomm the first DN
DatanodeInfo decomNodeFromSBN = decommissionNode(1, firstDN.getDatanodeUuid(), null,
AdminStates.DECOMMISSIONED);
// Step 3, recommission the first DN on SBN and ANN to create excess replica
// It recommissions the node on SBN first to create potential
// inconsistent state. In production cluster, such insistent state can happen
// even if recommission command was issued on ANN first given the async nature
// of the system.
// Step 3.a, ask SBN to recomm the first DN.
// SBN has been fixed so that it no longer invalidates excess replica during
// recommission.
// Before the fix, SBN could get into the following state.
// 1. the last DN would have been chosen as excess replica, given its
// heartbeat is considered old.
// Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
// 2. After recomissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
// and one excess replica ( 3 )
// After the fix,
// After recomissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
Thread.sleep(slowHeartbeatDNwaitTime);
recomissionNode(1, decomNodeFromSBN);
// Step 3.b, ask ANN to recommission the first DN.
// To verify the fix, the test makes sure the excess replica picked by ANN
// is different from the one picked by SBN before the fix.
// To achieve that, we make sure next-to-last DN is chosen as excess replica
// by ANN.
// 1. restore LastDNprop's heartbeat interval.
// 2. Make next-to-last DN's heartbeat slow.
MiniDFSCluster.DataNodeProperties LastDNprop = cluster.stopDataNode(3);
LastDNprop.conf.setLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
cluster.restartDataNode(LastDNprop);
MiniDFSCluster.DataNodeProperties nextToLastDNprop = cluster.stopDataNode(2);
nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
cluster.restartDataNode(nextToLastDNprop);
cluster.waitActive();
Thread.sleep(slowHeartbeatDNwaitTime);
recomissionNode(0, decommissionedNodeFromANN);
// Step 3.c, make sure the DN has deleted the block and report to NNs
cluster.triggerHeartbeats();
HATestUtil.waitForDNDeletions(cluster);
cluster.triggerDeletionReports();
// Step 4, decommission the first DN on both ANN and SBN
// With the fix to make sure SBN no longer marks excess replica
// during recommission, SBN's decommission can finish properly
decommissionNode(0, firstDN.getDatanodeUuid(), null,
AdminStates.DECOMMISSIONED);
// Ask SBN to decomm the first DN
decommissionNode(1, firstDN.getDatanodeUuid(), null,
AdminStates.DECOMMISSIONED);
cluster.shutdown();
}
private void testDecommission(int numNamenodes, int numDatanodes) private void testDecommission(int numNamenodes, int numDatanodes)
throws IOException { throws IOException {
LOG.info("Starting test testDecommission"); LOG.info("Starting test testDecommission");
@ -430,7 +570,7 @@ private void testDecommission(int numNamenodes, int numDatanodes)
int liveDecomissioned = ns.getNumDecomLiveDataNodes(); int liveDecomissioned = ns.getNumDecomLiveDataNodes();
// Decommission one node. Verify that node is decommissioned. // Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes, DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
AdminStates.DECOMMISSIONED); AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode); decommissionedNodes.add(decomNode);
assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
@ -458,7 +598,7 @@ private void testDecommission(int numNamenodes, int numDatanodes)
} }
} }
// Restart the cluster and ensure recommissioned datanodes // Restart the cluster and ensure decommissioned datanodes
// are allowed to register with the namenode // are allowed to register with the namenode
cluster.shutdown(); cluster.shutdown();
startCluster(numNamenodes, numDatanodes, conf); startCluster(numNamenodes, numDatanodes, conf);
@ -486,7 +626,7 @@ private void testRecommission(int numNamenodes, int numDatanodes)
writeFile(fileSys, file1, replicas); writeFile(fileSys, file1, replicas);
// Decommission one node. Verify that node is decommissioned. // Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes, DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
AdminStates.DECOMMISSIONED); AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode); decommissionedNodes.add(decomNode);
@ -510,7 +650,7 @@ private void testRecommission(int numNamenodes, int numDatanodes)
+ tries + " times.", tries < 20); + tries + " times.", tries < 20);
// stop decommission and check if the new replicas are removed // stop decommission and check if the new replicas are removed
recomissionNode(decomNode); recomissionNode(0, decomNode);
// wait for the block to be deleted // wait for the block to be deleted
tries = 0; tries = 0;
while (tries++ < 20) { while (tries++ < 20) {
@ -561,7 +701,7 @@ public void testClusterStats(int numNameNodes) throws IOException,
FSNamesystem fsn = cluster.getNamesystem(i); FSNamesystem fsn = cluster.getNamesystem(i);
NameNode namenode = cluster.getNameNode(i); NameNode namenode = cluster.getNameNode(i);
DatanodeInfo downnode = decommissionNode(i, null, DatanodeInfo downnode = decommissionNode(i, null, null,
AdminStates.DECOMMISSION_INPROGRESS); AdminStates.DECOMMISSION_INPROGRESS);
// Check namenode stats for multiple datanode heartbeats // Check namenode stats for multiple datanode heartbeats
verifyStats(namenode, fsn, downnode, true); verifyStats(namenode, fsn, downnode, true);