YARN-5195. RM intermittently crashed with NPE while handling APP_ATTEMPT_REMOVED event when async-scheduling enabled in CapacityScheduler. (sandflee via wangda)
This commit is contained in:
parent
2d8d183b19
commit
d62e121ffc
@ -1209,11 +1209,18 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!nodeTracker.exists(node.getNodeID())) {
|
||||
LOG.info("Skipping scheduling as the node " + node.getNodeID() +
|
||||
" has been removed");
|
||||
return;
|
||||
}
|
||||
|
||||
// reset allocation and reservation stats before we start doing any work
|
||||
updateSchedulerHealth(lastNodeUpdateTime, node,
|
||||
new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
|
||||
|
@ -3375,4 +3375,44 @@ public void handle(Event event) {
|
||||
Assert.assertEquals(availableResource.getMemorySize(), 0);
|
||||
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchedulingOnRemovedNode() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(
|
||||
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
|
||||
false);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
RMApp app = rm.submitApp(100);
|
||||
rm.drainEvents();
|
||||
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
|
||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||
|
||||
//remove nm2 to keep am alive
|
||||
MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
|
||||
|
||||
am.allocate(ResourceRequest.ANY, 2048, 1, null);
|
||||
|
||||
CapacityScheduler scheduler =
|
||||
(CapacityScheduler) rm.getRMContext().getScheduler();
|
||||
FiCaSchedulerNode node =
|
||||
(FiCaSchedulerNode)
|
||||
scheduler.getNodeTracker().getNode(nm2.getNodeId());
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
|
||||
// schedulerNode is removed, try allocate a container
|
||||
scheduler.allocateContainersToNode(node);
|
||||
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||
new AppAttemptRemovedSchedulerEvent(
|
||||
am.getApplicationAttemptId(),
|
||||
RMAppAttemptState.FINISHED, false);
|
||||
scheduler.handle(appRemovedEvent1);
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user