YARN-8774. Memory leak when CapacityScheduler allocates from reserved container with non-default label. Contributed by Tao Yang.
This commit is contained in:
parent
7093afd874
commit
8598b498bc
@ -750,9 +750,25 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult,
|
|||||||
// When reserving container
|
// When reserving container
|
||||||
RMContainer updatedContainer = reservedContainer;
|
RMContainer updatedContainer = reservedContainer;
|
||||||
if (updatedContainer == null) {
|
if (updatedContainer == null) {
|
||||||
|
AppPlacementAllocator<FiCaSchedulerNode> ps =
|
||||||
|
application.getAppSchedulingInfo()
|
||||||
|
.getAppPlacementAllocator(schedulerKey);
|
||||||
|
if (null == ps) {
|
||||||
|
LOG.warn("Failed to get " + AppPlacementAllocator.class.getName()
|
||||||
|
+ " for application=" + application.getApplicationId()
|
||||||
|
+ " schedulerRequestKey=" + schedulerKey);
|
||||||
|
ActivitiesLogger.APP
|
||||||
|
.recordAppActivityWithoutAllocation(activitiesManager, node,
|
||||||
|
application, schedulerKey.getPriority(),
|
||||||
|
ActivityDiagnosticConstant.
|
||||||
|
PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST,
|
||||||
|
ActivityState.REJECTED);
|
||||||
|
return ContainerAllocation.PRIORITY_SKIPPED;
|
||||||
|
}
|
||||||
updatedContainer = new RMContainerImpl(container, schedulerKey,
|
updatedContainer = new RMContainerImpl(container, schedulerKey,
|
||||||
application.getApplicationAttemptId(), node.getNodeID(),
|
application.getApplicationAttemptId(), node.getNodeID(),
|
||||||
application.getAppSchedulingInfo().getUser(), rmContext);
|
application.getAppSchedulingInfo().getUser(), rmContext,
|
||||||
|
ps.getPrimaryRequestedNodePartition());
|
||||||
}
|
}
|
||||||
allocationResult.updatedContainer = updatedContainer;
|
allocationResult.updatedContainer = updatedContainer;
|
||||||
}
|
}
|
||||||
|
@ -547,6 +547,68 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
|||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 120000)
|
||||||
|
public void testRMContainerLeakInLeafQueue() throws Exception {
|
||||||
|
// set node -> label
|
||||||
|
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
|
||||||
|
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
||||||
|
NodeId.newInstance("h2", 0), toSet("x")));
|
||||||
|
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm1 =
|
||||||
|
new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
||||||
|
@Override public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
||||||
|
rm1.registerNode("h2:1234", 8 * GB); // label = x
|
||||||
|
|
||||||
|
// launch an app to queue a1 (label = x), and check all container will
|
||||||
|
// be allocated in h1
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1");
|
||||||
|
MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||||
|
|
||||||
|
// request a container.
|
||||||
|
am1.allocate("*", 7 * GB, 2, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||||
|
|
||||||
|
// Do node heartbeats 1 time
|
||||||
|
// scheduler will reserve a container for app1
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
// Check if a 4G container allocated for app1, and 4G is reserved
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
||||||
|
|
||||||
|
// kill app2 then do node heartbeat 1 time
|
||||||
|
// scheduler will allocate a container from the reserved container on nm1
|
||||||
|
rm1.killApp(app2.getApplicationId());
|
||||||
|
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
|
||||||
|
|
||||||
|
// After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should
|
||||||
|
// be clean, otherwise resource leak happened
|
||||||
|
rm1.killApp(app1.getApplicationId());
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||||
|
Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
|
|
||||||
private void checkPendingResource(MockRM rm, int priority,
|
private void checkPendingResource(MockRM rm, int priority,
|
||||||
ApplicationAttemptId attemptId, int memory) {
|
ApplicationAttemptId attemptId, int memory) {
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
|
||||||
|
Loading…
Reference in New Issue
Block a user