HDFS-11378. Verify multiple DataNodes can be decommissioned/maintenance at the same time. (Manoj Govindassamy via mingma)

This commit is contained in:
Ming Ma 2017-01-27 16:16:42 -08:00
parent ebd40056a0
commit 312b36d113
3 changed files with 186 additions and 44 deletions

View File

@ -22,11 +22,13 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -149,10 +151,18 @@ static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
} }
} }
/* /**
* decommission the DN or put the DN into maintenance for datanodeUuid or one * Decommission or perform Maintenance for DataNodes and wait for them to
* random node if datanodeUuid is null. * reach the expected state.
* And wait for the node to reach the given {@code waitForState}. *
* @param nnIndex NameNode index
* @param datanodeUuid DataNode to decommission/maintenance, or a random
* DataNode if null
* @param maintenanceExpirationInMS Maintenance expiration time
* @param decommissionedNodes List of DataNodes already decommissioned
* @param waitForState Await for this state for datanodeUuid DataNode
* @return DatanodeInfo DataNode taken out of service
* @throws IOException
*/ */
protected DatanodeInfo takeNodeOutofService(int nnIndex, protected DatanodeInfo takeNodeOutofService(int nnIndex,
String datanodeUuid, long maintenanceExpirationInMS, String datanodeUuid, long maintenanceExpirationInMS,
@ -162,48 +172,91 @@ protected DatanodeInfo takeNodeOutofService(int nnIndex,
maintenanceExpirationInMS, decommissionedNodes, null, waitForState); maintenanceExpirationInMS, decommissionedNodes, null, waitForState);
} }
/* /**
* decommission the DN or put the DN to maintenance set by datanodeUuid * Decommission or perform Maintenance for DataNodes and wait for them to
* Pick randome node if datanodeUuid == null * reach the expected state.
* wait for the node to reach the given {@code waitForState}. *
* @param nnIndex NameNode index
* @param datanodeUuid DataNode to decommission/maintenance, or a random
* DataNode if null
* @param maintenanceExpirationInMS Maintenance expiration time
* @param decommissionedNodes List of DataNodes already decommissioned
* @param inMaintenanceNodes Map of DataNodes already entering/in maintenance
* @param waitForState Await for this state for datanodeUuid DataNode
* @return DatanodeInfo DataNode taken out of service
* @throws IOException
*/ */
protected DatanodeInfo takeNodeOutofService(int nnIndex, protected DatanodeInfo takeNodeOutofService(int nnIndex,
String datanodeUuid, long maintenanceExpirationInMS, String datanodeUuid, long maintenanceExpirationInMS,
List<DatanodeInfo> decommissionedNodes, List<DatanodeInfo> decommissionedNodes,
Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState) Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
throws IOException { throws IOException {
return takeNodeOutofService(nnIndex, (datanodeUuid != null ?
Lists.newArrayList(datanodeUuid) : null),
maintenanceExpirationInMS, decommissionedNodes, inMaintenanceNodes,
waitForState).get(0);
}
/**
* Decommission or perform Maintenance for DataNodes and wait for them to
* reach the expected state.
*
* @param nnIndex NameNode index
* @param dataNodeUuids DataNodes to decommission/maintenance, or a random
* DataNode if null
* @param maintenanceExpirationInMS Maintenance expiration time
* @param decommissionedNodes List of DataNodes already decommissioned
* @param inMaintenanceNodes Map of DataNodes already entering/in maintenance
* @param waitForState Await for this state for datanodeUuid DataNode
* @return DatanodeInfo DataNode taken out of service
* @throws IOException
*/
protected List<DatanodeInfo> takeNodeOutofService(int nnIndex,
List<String> dataNodeUuids, long maintenanceExpirationInMS,
List<DatanodeInfo> decommissionedNodes,
Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
throws IOException {
DFSClient client = getDfsClient(nnIndex); DFSClient client = getDfsClient(nnIndex);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
boolean isDecommissionRequest = boolean isDecommissionRequest =
waitForState == AdminStates.DECOMMISSION_INPROGRESS || waitForState == AdminStates.DECOMMISSION_INPROGRESS ||
waitForState == AdminStates.DECOMMISSIONED; waitForState == AdminStates.DECOMMISSIONED;
// List<String> dataNodeNames = new ArrayList<>();
// pick one datanode randomly unless the caller specifies one. List<DatanodeInfo> datanodeInfos = new ArrayList<>();
// // pick one DataNode randomly unless the caller specifies one.
int index = 0; if (dataNodeUuids == null) {
if (datanodeUuid == null) {
boolean found = false; boolean found = false;
while (!found) { while (!found) {
index = myrand.nextInt(info.length); int index = myrand.nextInt(info.length);
if ((isDecommissionRequest && !info[index].isDecommissioned()) || if ((isDecommissionRequest && !info[index].isDecommissioned()) ||
(!isDecommissionRequest && !info[index].isInMaintenance())) { (!isDecommissionRequest && !info[index].isInMaintenance())) {
dataNodeNames.add(info[index].getXferAddr());
datanodeInfos.add(NameNodeAdapter.getDatanode(
cluster.getNamesystem(nnIndex), info[index]));
found = true; found = true;
} }
} }
} else { } else {
// The caller specifies a DN // The caller specified a DataNode
for (; index < info.length; index++) { for (String datanodeUuid : dataNodeUuids) {
if (info[index].getDatanodeUuid().equals(datanodeUuid)) { boolean found = false;
break; for (int index = 0; index < info.length; index++) {
if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
dataNodeNames.add(info[index].getXferAddr());
datanodeInfos.add(NameNodeAdapter.getDatanode(
cluster.getNamesystem(nnIndex), info[index]));
found = true;
break;
}
}
if (!found) {
throw new IOException("invalid datanodeUuid " + datanodeUuid);
} }
} }
if (index == info.length) {
throw new IOException("invalid datanodeUuid " + datanodeUuid);
}
} }
String nodename = info[index].getXferAddr(); LOG.info("Taking node: " + Arrays.toString(dataNodeNames.toArray())
LOG.info("Taking node: " + nodename + " out of service"); + " out of service");
ArrayList<String> decommissionNodes = new ArrayList<String>(); ArrayList<String> decommissionNodes = new ArrayList<String>();
if (decommissionedNodes != null) { if (decommissionedNodes != null) {
@ -220,18 +273,20 @@ protected DatanodeInfo takeNodeOutofService(int nnIndex,
} }
if (isDecommissionRequest) { if (isDecommissionRequest) {
decommissionNodes.add(nodename); for (String dataNodeName : dataNodeNames) {
decommissionNodes.add(dataNodeName);
}
} else { } else {
maintenanceNodes.put(nodename, maintenanceExpirationInMS); for (String dataNodeName : dataNodeNames) {
maintenanceNodes.put(dataNodeName, maintenanceExpirationInMS);
}
} }
// write node names into the json host file. // write node names into the json host file.
hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes); hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
refreshNodes(nnIndex); refreshNodes(nnIndex);
DatanodeInfo ret = NameNodeAdapter.getDatanode( waitNodeState(datanodeInfos, waitForState);
cluster.getNamesystem(nnIndex), info[index]); return datanodeInfos;
waitNodeState(ret, waitForState);
return ret;
} }
/* Ask a specific NN to put the datanode in service and wait for it /* Ask a specific NN to put the datanode in service and wait for it
@ -270,23 +325,31 @@ protected void putNodeInService(int nnIndex,
putNodeInService(nnIndex, datanodeInfo); putNodeInService(nnIndex, datanodeInfo);
} }
/* /**
* Wait till node is transitioned to the expected state. * Wait till DataNode is transitioned to the expected state.
*/ */
protected void waitNodeState(DatanodeInfo node, protected void waitNodeState(DatanodeInfo node, AdminStates state) {
AdminStates state) { waitNodeState(Lists.newArrayList(node), state);
boolean done = state == node.getAdminState(); }
while (!done) {
LOG.info("Waiting for node " + node + " to change state to " /**
+ state + " current state: " + node.getAdminState()); * Wait till all DataNodes are transitioned to the expected state.
try { */
Thread.sleep(HEARTBEAT_INTERVAL * 500); protected void waitNodeState(List<DatanodeInfo> nodes, AdminStates state) {
} catch (InterruptedException e) { for (DatanodeInfo node : nodes) {
// nothing boolean done = (state == node.getAdminState());
while (!done) {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(HEARTBEAT_INTERVAL * 500);
} catch (InterruptedException e) {
// nothing
}
done = (state == node.getAdminState());
} }
done = state == node.getAdminState(); LOG.info("node " + node + " reached the state " + state);
} }
LOG.info("node " + node + " reached the state " + state);
} }
protected void initIncludeHost(String hostNameAndPort) throws IOException { protected void initIncludeHost(String hostNameAndPort) throws IOException {

View File

@ -1083,4 +1083,47 @@ public void testUsedCapacity() throws Exception {
assertTrue("BlockPoolUsed should not be the same after a node has " + assertTrue("BlockPoolUsed should not be the same after a node has " +
"been decommissioned!",initialBlockPoolUsed != newBlockPoolUsed); "been decommissioned!",initialBlockPoolUsed != newBlockPoolUsed);
} }
/**
* Verify if multiple DataNodes can be decommission at the same time.
*/
@Test(timeout = 360000)
public void testMultipleNodesDecommission() throws Exception {
startCluster(1, 5);
final Path file = new Path("/testMultipleNodesDecommission.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
int repl = 3;
writeFile(fileSys, file, repl, 1);
// Request Decommission for DataNodes 1 and 2.
List<DatanodeInfo> decomDataNodes = takeNodeOutofService(0,
Lists.newArrayList(getCluster().getDataNodes().get(0).getDatanodeUuid(),
getCluster().getDataNodes().get(1).getDatanodeUuid()),
Long.MAX_VALUE, null, null, AdminStates.DECOMMISSIONED);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
String errMsg = checkFile(fileSys, file, repl,
decomDataNodes.get(0).getXferAddr(), 5);
if (errMsg != null) {
LOG.warn("Check file: " + errMsg);
}
return true;
} catch (IOException e) {
LOG.warn("Check file: " + e);
return false;
}
}
}, 500, 30000);
// Put the decommissioned nodes back in service.
for (DatanodeInfo datanodeInfo : decomDataNodes) {
putNodeInService(0, datanodeInfo);
}
cleanupFile(fileSys, file);
}
} }

View File

@ -29,6 +29,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.common.collect.Lists;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -520,6 +521,41 @@ private void testDecommissionDifferentNodeAfterMaintenance(int repl)
cleanupFile(fileSys, file); cleanupFile(fileSys, file);
} }
/**
* Verify if multiple DataNodes can transition to maintenance state
* at the same time.
*/
@Test(timeout = 360000)
public void testMultipleNodesMaintenance() throws Exception {
startCluster(1, 5);
final Path file = new Path("/testMultipleNodesMaintenance.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
int repl = 3;
writeFile(fileSys, file, repl, 1);
final DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys,
file);
// Request maintenance for DataNodes 1 and 2 which has the file blocks.
List<DatanodeInfo> maintenanceDN = takeNodeOutofService(0,
Lists.newArrayList(nodes[0].getDatanodeUuid(),
nodes[1].getDatanodeUuid()), Long.MAX_VALUE, null, null,
AdminStates.IN_MAINTENANCE);
// Verify file replication matches maintenance state min replication
assertNull(checkWithRetry(ns, fileSys, file, 1, null, nodes[0]));
// Put the maintenance nodes back in service
for (DatanodeInfo datanodeInfo : maintenanceDN) {
putNodeInService(0, datanodeInfo);
}
// Verify file replication catching up to the old state
assertNull(checkWithRetry(ns, fileSys, file, repl, null));
cleanupFile(fileSys, file);
}
@Test(timeout = 360000) @Test(timeout = 360000)
public void testChangeReplicationFactors() throws IOException { public void testChangeReplicationFactors() throws IOException {