YARN-8528. Final states in ContainerAllocation might be modified externally causing unexpected allocation results. Contributed by Xintong Song.
This commit is contained in:
parent
7b25fb949b
commit
cbf2026483
@ -56,7 +56,7 @@ public class ContainerAllocation {
|
|||||||
|
|
||||||
RMContainer containerToBeUnreserved;
|
RMContainer containerToBeUnreserved;
|
||||||
private Resource resourceToBeAllocated = Resources.none();
|
private Resource resourceToBeAllocated = Resources.none();
|
||||||
AllocationState state;
|
private AllocationState state;
|
||||||
NodeType containerNodeType = NodeType.NODE_LOCAL;
|
NodeType containerNodeType = NodeType.NODE_LOCAL;
|
||||||
NodeType requestLocalityType = null;
|
NodeType requestLocalityType = null;
|
||||||
|
|
||||||
|
@ -263,7 +263,7 @@ ContainerAllocation tryAllocateOnNode(Resource clusterResource,
|
|||||||
reservedContainer, schedulingMode, resourceLimits);
|
reservedContainer, schedulingMode, resourceLimits);
|
||||||
|
|
||||||
if (null == reservedContainer) {
|
if (null == reservedContainer) {
|
||||||
if (result.state == AllocationState.PRIORITY_SKIPPED) {
|
if (result.getAllocationState() == AllocationState.PRIORITY_SKIPPED) {
|
||||||
// Don't count 'skipped nodes' as a scheduling opportunity!
|
// Don't count 'skipped nodes' as a scheduling opportunity!
|
||||||
application.subtractSchedulingOpportunity(schedulerKey);
|
application.subtractSchedulingOpportunity(schedulerKey);
|
||||||
}
|
}
|
||||||
@ -487,8 +487,8 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource,
|
|||||||
|
|
||||||
// When a returned allocation is LOCALITY_SKIPPED, since we're in
|
// When a returned allocation is LOCALITY_SKIPPED, since we're in
|
||||||
// off-switch request now, we will skip this app w.r.t priorities
|
// off-switch request now, we will skip this app w.r.t priorities
|
||||||
if (allocation.state == AllocationState.LOCALITY_SKIPPED) {
|
if (allocation.getAllocationState() == AllocationState.LOCALITY_SKIPPED) {
|
||||||
allocation.state = AllocationState.APP_SKIPPED;
|
allocation = ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
allocation.requestLocalityType = requestLocalityType;
|
allocation.requestLocalityType = requestLocalityType;
|
||||||
|
|
||||||
@ -836,8 +836,8 @@ private ContainerAllocation allocate(Resource clusterResource,
|
|||||||
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
|
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
|
||||||
resourceLimits, schedulerKey, reservedContainer);
|
resourceLimits, schedulerKey, reservedContainer);
|
||||||
|
|
||||||
if (AllocationState.ALLOCATED == result.state
|
if (AllocationState.ALLOCATED == result.getAllocationState()
|
||||||
|| AllocationState.RESERVED == result.state) {
|
|| AllocationState.RESERVED == result.getAllocationState()) {
|
||||||
result = doAllocation(result, node, schedulerKey, reservedContainer);
|
result = doAllocation(result, node, schedulerKey, reservedContainer);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -134,6 +134,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
@ -4930,4 +4932,50 @@ public Object answer(InvocationOnMock invocation) throws Exception {
|
|||||||
spyCs.handle(new NodeUpdateSchedulerEvent(
|
spyCs.handle(new NodeUpdateSchedulerEvent(
|
||||||
spyCs.getNode(nm.getNodeId()).getRMNode()));
|
spyCs.getNode(nm.getNodeId()).getRMNode()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Testcase for YARN-8528
|
||||||
|
// This is to test whether ContainerAllocation constants are holding correct
|
||||||
|
// values during scheduling.
|
||||||
|
@Test
|
||||||
|
public void testContainerAllocationLocalitySkipped() throws Exception {
|
||||||
|
Assert.assertEquals(AllocationState.APP_SKIPPED,
|
||||||
|
ContainerAllocation.APP_SKIPPED.getAllocationState());
|
||||||
|
Assert.assertEquals(AllocationState.LOCALITY_SKIPPED,
|
||||||
|
ContainerAllocation.LOCALITY_SKIPPED.getAllocationState());
|
||||||
|
Assert.assertEquals(AllocationState.PRIORITY_SKIPPED,
|
||||||
|
ContainerAllocation.PRIORITY_SKIPPED.getAllocationState());
|
||||||
|
Assert.assertEquals(AllocationState.QUEUE_SKIPPED,
|
||||||
|
ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
|
||||||
|
|
||||||
|
// init RM & NMs & Nodes
|
||||||
|
final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
rm.start();
|
||||||
|
final MockNM nm1 = rm.registerNode("h1:1234", 4 * GB);
|
||||||
|
final MockNM nm2 = rm.registerNode("h2:1234", 6 * GB); // maximum-allocation-mb = 6GB
|
||||||
|
|
||||||
|
// submit app and request resource
|
||||||
|
// container2 is larger than nm1 total resource, will trigger locality skip
|
||||||
|
final RMApp app = rm.submitApp(1 * GB, "app", "user");
|
||||||
|
final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||||
|
am.addRequests(new String[] {"*"}, 5 * GB, 1, 1, 2);
|
||||||
|
am.schedule();
|
||||||
|
|
||||||
|
// container1 (am) should be acquired, container2 should not
|
||||||
|
RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(node1));
|
||||||
|
ContainerId cid = ContainerId.newContainerId(am.getApplicationAttemptId(), 1l);
|
||||||
|
Assert.assertEquals(cs.getRMContainer(cid).getState(), RMContainerState.ACQUIRED);
|
||||||
|
cid = ContainerId.newContainerId(am.getApplicationAttemptId(), 2l);
|
||||||
|
Assert.assertNull(cs.getRMContainer(cid));
|
||||||
|
|
||||||
|
Assert.assertEquals(AllocationState.APP_SKIPPED,
|
||||||
|
ContainerAllocation.APP_SKIPPED.getAllocationState());
|
||||||
|
Assert.assertEquals(AllocationState.LOCALITY_SKIPPED,
|
||||||
|
ContainerAllocation.LOCALITY_SKIPPED.getAllocationState());
|
||||||
|
Assert.assertEquals(AllocationState.PRIORITY_SKIPPED,
|
||||||
|
ContainerAllocation.PRIORITY_SKIPPED.getAllocationState());
|
||||||
|
Assert.assertEquals(AllocationState.QUEUE_SKIPPED,
|
||||||
|
ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user