diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 40405fc11a..e9bee148c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -417,7 +417,9 @@ public boolean accept(Resource cluster, // Common part of check container allocation regardless if it is a // increase container or regular container - commonCheckContainerAllocation(allocation, schedulerContainer); + if (!commonCheckContainerAllocation(allocation, schedulerContainer)) { + return false; + } } else { // Container reserved first time will be NEW, after the container // accepted & confirmed, it will become RESERVED state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 0c3130dc2f..77596e25be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -405,6 +405,77 @@ public Object answer(InvocationOnMock invocation) throws Exception { rm.stop(); } + @Test (timeout = 30000) + public void testNodeResourceOverAllocated() + throws Exception { + // disable async-scheduling for simulating complex scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs & Nodes + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB); + List nmLst = new ArrayList<>(); + nmLst.add(nm1); + nmLst.add(nm2); + + // init scheduler & nodes + while ( + ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker() + .nodeCount() < 2) { + Thread.sleep(10); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + CapacityScheduler scheduler = + (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // launch app + RMApp app = rm.submitApp(200, "app", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + FiCaSchedulerApp schedulerApp = + scheduler.getApplicationAttempt(am.getApplicationAttemptId()); + // allocate 2 containers and running on nm1 + Resource containerResource = Resources.createResource(5 * GB); + am.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(0), "*", containerResource, 2)), + null); + + // generate over-allocated proposals for nm1 + for (int containerNo = 2; containerNo <= 3; containerNo++) { + Container container = Container.newInstance( + ContainerId.newContainerId(am.getApplicationAttemptId(), containerNo), + sn1.getNodeID(), sn1.getHttpAddress(), containerResource, + Priority.newInstance(0), null); + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.create(ResourceRequest + .newInstance(Priority.newInstance(0), "*", containerResource, 1)), + am.getApplicationAttemptId(), sn1.getNodeID(), "user", + rm.getRMContext()); + SchedulerContainer newContainer = new SchedulerContainer(schedulerApp, + scheduler.getNode(sn1.getNodeID()), rmContainer, "", true); + ContainerAllocationProposal newContainerProposal = + new ContainerAllocationProposal(newContainer, null, null, + NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, containerResource); + List newProposals = new ArrayList<>(); + newProposals.add(newContainerProposal); + ResourceCommitRequest request = + new ResourceCommitRequest(newProposals, null, null); + scheduler.tryCommit(scheduler.getClusterResource(), request); + } + // make sure node resource can't be over-allocated! + Assert.assertTrue("Node resource is Over-allocated!", + sn1.getUnallocatedResource().getMemorySize() > 0); + rm.stop(); + } + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId) throws Exception {