YARN-8127. Resource leak when async scheduling is enabled. Contributed by Tao Yang.
This commit is contained in:
parent
b0aff8a962
commit
7eb783e263
@ -339,6 +339,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// If allocate from reserved container, make sure node is still reserved
|
||||
if (allocation.getAllocateFromReservedContainer() != null
|
||||
&& reservedContainerOnNode == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Try to allocate from reserved container " + allocation
|
||||
.getAllocateFromReservedContainer().getRmContainer()
|
||||
.getContainerId() + ", but node is not reserved");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Do we have enough space on this node?
|
||||
Resource availableResource = Resources.clone(
|
||||
|
@ -594,6 +594,97 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||
}
|
||||
}
|
||||
|
||||
// Testcase for YARN-8127
|
||||
@Test (timeout = 30000)
|
||||
public void testCommitDuplicatedAllocateFromReservedProposals()
|
||||
throws Exception {
|
||||
// disable async-scheduling for simulating complex scene
|
||||
Configuration disableAsyncConf = new Configuration(conf);
|
||||
disableAsyncConf.setBoolean(
|
||||
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
|
||||
|
||||
// init RM & NMs
|
||||
final MockRM rm = new MockRM(disableAsyncConf);
|
||||
rm.start();
|
||||
final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
|
||||
rm.registerNode("192.168.0.2:2234", 8 * GB);
|
||||
|
||||
// init scheduler & nodes
|
||||
while (
|
||||
((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
|
||||
.nodeCount() < 2) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
Assert.assertEquals(2,
|
||||
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
|
||||
.getNodeTracker().nodeCount());
|
||||
CapacityScheduler cs =
|
||||
(CapacityScheduler) rm.getRMContext().getScheduler();
|
||||
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
|
||||
|
||||
// launch app
|
||||
RMApp app = rm.submitApp(1 * GB, "app", "user", null, false, "default",
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
|
||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||
FiCaSchedulerApp schedulerApp =
|
||||
cs.getApplicationAttempt(am.getApplicationAttemptId());
|
||||
|
||||
// app asks 1 * 6G container
|
||||
// nm1 runs 2 container(container_01/AM, container_02)
|
||||
allocateAndLaunchContainers(am, nm1, rm, 1,
|
||||
Resources.createResource(6 * GB), 0, 2);
|
||||
Assert.assertEquals(2, sn1.getNumContainers());
|
||||
Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize());
|
||||
|
||||
// app asks 5 * 2G container
|
||||
// nm1 reserves 1 * 2G containers
|
||||
am.allocate(Arrays.asList(ResourceRequest
|
||||
.newInstance(Priority.newInstance(0), "*",
|
||||
Resources.createResource(2 * GB), 5)), null);
|
||||
cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
|
||||
Assert.assertEquals(1, schedulerApp.getReservedContainers().size());
|
||||
|
||||
// rm kills 1 * 6G container_02
|
||||
for (RMContainer rmContainer : sn1.getCopiedListOfRunningContainers()) {
|
||||
if (rmContainer.getContainerId().getContainerId() != 1) {
|
||||
cs.completedContainer(rmContainer, ContainerStatus
|
||||
.newInstance(rmContainer.getContainerId(),
|
||||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL);
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(7 * GB, sn1.getUnallocatedResource().getMemorySize());
|
||||
|
||||
final CapacityScheduler spyCs = Mockito.spy(cs);
|
||||
// handle CapacityScheduler#tryCommit, submit duplicated proposals
|
||||
// that do allocation for reserved container for three times,
|
||||
// to simulate that case in YARN-8127
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
public Boolean answer(InvocationOnMock invocation) throws Exception {
|
||||
ResourceCommitRequest request =
|
||||
(ResourceCommitRequest) invocation.getArguments()[1];
|
||||
if (request.getFirstAllocatedOrReservedContainer()
|
||||
.getAllocateFromReservedContainer() != null) {
|
||||
for (int i=0; i<3; i++) {
|
||||
cs.tryCommit((Resource) invocation.getArguments()[0],
|
||||
(ResourceCommitRequest) invocation.getArguments()[1],
|
||||
(Boolean) invocation.getArguments()[2]);
|
||||
}
|
||||
Assert.assertEquals(2, sn1.getCopiedListOfRunningContainers().size());
|
||||
Assert.assertEquals(5 * GB,
|
||||
sn1.getUnallocatedResource().getMemorySize());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}).when(spyCs).tryCommit(Mockito.any(Resource.class),
|
||||
Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
|
||||
|
||||
spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
|
||||
if (nmHeartbeatThread != null) {
|
||||
nmHeartbeatThread.setShouldStop();
|
||||
|
Loading…
x
Reference in New Issue
Block a user