YARN-4386. refreshNodesGracefully() should send recommission event to active RMNodes only. Contributed by Kuhu Shukla.
This commit is contained in:
parent
5e7d4d51f8
commit
3fab88540f
@ -1433,6 +1433,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-4654. Yarn node label CLI should parse "=" correctly when trying to
|
YARN-4654. Yarn node label CLI should parse "=" correctly when trying to
|
||||||
remove all labels on a node. (Naganarasimha G R via rohithsharmaks)
|
remove all labels on a node. (Naganarasimha G R via rohithsharmaks)
|
||||||
|
|
||||||
|
YARN-4386. refreshNodesGracefully() should send recommission event to active
|
||||||
|
RMNodes only. (Kuhu Shukla via junping_du)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -400,8 +400,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
|
|||||||
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
|
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
|
||||||
} else {
|
} else {
|
||||||
// Recommissioning the nodes
|
// Recommissioning the nodes
|
||||||
if (entry.getValue().getState() == NodeState.DECOMMISSIONING
|
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
|
||||||
|| entry.getValue().getState() == NodeState.DECOMMISSIONED) {
|
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
|
.handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
|
||||||
}
|
}
|
||||||
|
@ -1269,6 +1269,41 @@ public void testInitDecommMetricNoRegistration() throws Exception {
|
|||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncorrectRecommission() throws Exception {
|
||||||
|
//Check decommissioned node not get recommissioned with graceful refresh
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
File excludeHostFile =
|
||||||
|
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
|
||||||
|
writeToHostsFile(excludeHostFile, "host3", "host2");
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
excludeHostFile.getAbsolutePath());
|
||||||
|
writeToHostsFile(hostFile, "host1", "host2");
|
||||||
|
writeToHostsFile(excludeHostFile, "host1");
|
||||||
|
rm.getNodesListManager().refreshNodesGracefully(conf);
|
||||||
|
rm.drainEvents();
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
rm.drainEvents();
|
||||||
|
Assert.assertTrue("Node " + nm1.getNodeId().getHost() +
|
||||||
|
" should be Decommissioned", rm.getRMContext()
|
||||||
|
.getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState
|
||||||
|
.DECOMMISSIONED);
|
||||||
|
writeToHostsFile(excludeHostFile, "");
|
||||||
|
rm.getNodesListManager().refreshNodesGracefully(conf);
|
||||||
|
rm.drainEvents();
|
||||||
|
Assert.assertTrue("Node " + nm1.getNodeId().getHost() +
|
||||||
|
" should be Decommissioned", rm.getRMContext()
|
||||||
|
.getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState
|
||||||
|
.DECOMMISSIONED);
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private void writeToHostsFile(String... hosts) throws IOException {
|
private void writeToHostsFile(String... hosts) throws IOException {
|
||||||
writeToHostsFile(hostFile, hosts);
|
writeToHostsFile(hostFile, hosts);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user