YARN-116. Add the ability to change the RM include/exclude file without a restart. (Contributed by xieguiming and Harsh J)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1391912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-09-29 22:39:03 +00:00
parent ab52567d6c
commit 39f15579b8
4 changed files with 92 additions and 8 deletions

View File

@ -40,6 +40,9 @@ Release 2.0.3-alpha - Unreleased
YARN-53. Added the missing getGroups API to ResourceManager. (Bo Wang via YARN-53. Added the missing getGroups API to ResourceManager. (Bo Wang via
vinodkv) vinodkv)
YARN-116. Add the ability to change the RM include/exclude file without
a restart. (xieguiming and Harsh J via sseth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -189,15 +189,14 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnRemoteException { throws YarnRemoteException {
UserGroupInformation user = checkAcls("refreshNodes"); UserGroupInformation user = checkAcls("refreshNodes");
try { try {
this.nodesListManager.refreshNodes(); this.nodesListManager.refreshNodes(new YarnConfiguration());
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes", RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes",
"AdminService"); "AdminService");
return recordFactory.newRecordInstance(RefreshNodesResponse.class); return recordFactory.newRecordInstance(RefreshNodesResponse.class);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("Exception refreshing nodes ", ioe); LOG.info("Exception refreshing nodes ", ioe);
RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes",
adminAcl.toString(), "AdminService", adminAcl.toString(), "AdminService", "Exception refreshing nodes");
"Exception refreshing nodes");
throw RPCUtil.getRemoteException(ioe); throw RPCUtil.getRemoteException(ioe);
} }
} }

View File

@ -103,8 +103,16 @@ private void printConfiguredHosts() {
} }
} }
public void refreshNodes() throws IOException { public void refreshNodes(Configuration yarnConf) throws IOException {
synchronized (hostsReader) { synchronized (hostsReader) {
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
hostsReader.updateFileNames(yarnConf.get(
YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH), yarnConf.get(
YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
hostsReader.refresh(); hostsReader.refresh();
printConfiguredHosts(); printConfiguredHosts();
} }

View File

@ -54,7 +54,7 @@ public class TestResourceTrackerService {
private MockRM rm; private MockRM rm;
/** /**
* decommissioning using a include hosts file * Decommissioning using a pre-configured include hosts file
*/ */
@Test @Test
public void testDecommissionWithIncludeHosts() throws Exception { public void testDecommissionWithIncludeHosts() throws Exception {
@ -86,7 +86,7 @@ public void testDecommissionWithIncludeHosts() throws Exception {
String ip = NetUtils.normalizeHostName("localhost"); String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host1", ip); writeToHostsFile("host1", ip);
rm.getNodesListManager().refreshNodes(); rm.getNodesListManager().refreshNodes(conf);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@ -106,7 +106,7 @@ public void testDecommissionWithIncludeHosts() throws Exception {
} }
/** /**
* decommissioning using a exclude hosts file * Decommissioning using a pre-configured exclude hosts file
*/ */
@Test @Test
public void testDecommissionWithExcludeHosts() throws Exception { public void testDecommissionWithExcludeHosts() throws Exception {
@ -133,7 +133,7 @@ public void testDecommissionWithExcludeHosts() throws Exception {
String ip = NetUtils.normalizeHostName("localhost"); String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host2", ip); writeToHostsFile("host2", ip);
rm.getNodesListManager().refreshNodes(); rm.getNodesListManager().refreshNodes(conf);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@ -148,6 +148,80 @@ public void testDecommissionWithExcludeHosts() throws Exception {
checkDecommissionedNMCount(rm, ++metricCount); checkDecommissionedNMCount(rm, ++metricCount);
} }
/**
* Decommissioning using a post-configured include hosts file
*/
@Test
public void testAddNewIncludePathToConfiguration() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
writeToHostsFile("host1");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
"Node should not have been decomissioned.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
checkDecommissionedNMCount(rm, ++initialMetricCount);
}
/**
* Decommissioning using a post-configured exclude hosts file
*/
@Test
public void testAddNewExcludePathToConfiguration() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
writeToHostsFile("host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
"Node should not have been decomissioned.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
checkDecommissionedNMCount(rm, ++initialMetricCount);
}
@Test @Test
public void testNodeRegistrationFailure() throws Exception { public void testNodeRegistrationFailure() throws Exception {
writeToHostsFile("host1"); writeToHostsFile("host1");