MAPREDUCE-3873. Fixed NodeManagers' decommissioning at RM to accept IP addresses also. Contributed by xieguiming and vinodkv.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1346671 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-06-05 23:52:16 +00:00
parent d44513c376
commit 67cbc289e4
3 changed files with 40 additions and 18 deletions

View File

@ -163,6 +163,9 @@ Release 2.0.1-alpha - UNRELEASED
MAPREDUCE-4313. TestTokenCache doesn't compile due
TokenCache.getDelegationToken compilation error (bobby)
MAPREDUCE-3873. Fixed NodeManagers' decommissioning at RM to accept IP
addresses also. (xieguiming via vinodkv)
Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -37,6 +38,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.service.AbstractService;
@SuppressWarnings("unchecked")
public class NodesListManager extends AbstractService implements
EventHandler<NodesListManagerEvent> {
@ -112,8 +114,10 @@ public boolean isValidNode(String hostName) {
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
return ((hostsList.isEmpty() || hostsList.contains(hostName)) &&
!excludeList.contains(hostName));
String ip = NetUtils.normalizeHostName(hostName);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
}

View File

@ -28,14 +28,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
@ -52,8 +52,6 @@ public class TestResourceTrackerService {
"test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
private MockRM rm;
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
/**
* decommissioning using a include hosts file
@ -61,9 +59,9 @@ public class TestResourceTrackerService {
@Test
public void testDecommissionWithIncludeHosts() throws Exception {
writeToHostsFile("host1", "host2");
writeToHostsFile("localhost", "host1", "host2");
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm = new MockRM(conf);
@ -71,17 +69,22 @@ public void testDecommissionWithIncludeHosts() throws Exception {
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs();
int metricCount = metrics.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
writeToHostsFile("host1");
// To test that IPs also work
String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host1", ip);
rm.getNodesListManager().refreshNodes();
@ -94,7 +97,12 @@ public void testDecommissionWithIncludeHosts() throws Exception {
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
.equals(nodeHeartbeat.getNodeAction()));
checkDecommissionedNMCount(rm, ++initialMetricCount);
checkDecommissionedNMCount(rm, ++metricCount);
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
.getNumDecommisionedNMs());
}
/**
@ -103,7 +111,7 @@ public void testDecommissionWithIncludeHosts() throws Exception {
@Test
public void testDecommissionWithExcludeHosts() throws Exception {
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
writeToHostsFile("");
@ -112,16 +120,18 @@ public void testDecommissionWithExcludeHosts() throws Exception {
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
int initialMetricCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
writeToHostsFile("host2");
// To test that IPs also work
String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host2", ip);
rm.getNodesListManager().refreshNodes();
@ -130,14 +140,19 @@ public void testDecommissionWithExcludeHosts() throws Exception {
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
checkDecommissionedNMCount(rm, ++initialMetricCount);
checkDecommissionedNMCount(rm, ++metricCount);
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
checkDecommissionedNMCount(rm, ++metricCount);
}
@Test
public void testNodeRegistrationFailure() throws Exception {
writeToHostsFile("host1");
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
@ -191,7 +206,7 @@ private void checkRebootedNMCount(MockRM rm2, int count)
@Test
public void testUnhealthyNodeStatus() throws Exception {
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm = new MockRM(conf);