diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 962e040a61..9a7de65f19 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -132,6 +132,9 @@ Release 2.8.0 - UNRELEASED YARN-3425. NPE from RMNodeLabelsManager.serviceStop when NodeLabelsManager.serviceInit failed. (Bibin A Chundatt via wangda) + YARN-3415. Non-AM containers can be counted towards amResourceUsage of a + Fair Scheduler queue (Zhihai Xu via Sandy Ryza) + Release 2.7.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 46617ff5cd..f0d1ed182d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -523,8 +523,11 @@ private Resource assignContainer( // Inform the node node.allocateContainer(allocatedContainer); - // If this container is used to run AM, update the leaf queue's AM usage - if (getLiveContainers().size() == 1 && !getUnmanagedAM()) { + // If not running unmanaged, the first container we allocate is always + // the AM. Set the amResource for this app and update the leaf queue's AM + // usage + if (!isAmRunning() && !getUnmanagedAM()) { + setAMResource(container.getResource()); getQueue().addAMResourceUsage(container.getResource()); setAmRunning(true); } @@ -551,6 +554,19 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); } + // Check the AM resource usage for the leaf queue + if (!isAmRunning() && !getUnmanagedAM()) { + List ask = appSchedulingInfo.getAllResourceRequests(); + if (ask.isEmpty() || !getQueue().canRunAppAM( + ask.get(0).getCapability())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping allocation because maxAMShare limit would " + + "be exceeded"); + } + return Resources.none(); + } + } + Collection prioritiesToTry = (reserved) ? Arrays.asList(node.getReservedContainer().getReservedPriority()) : getPriorities(); @@ -567,17 +583,6 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { addSchedulingOpportunity(priority); - // Check the AM resource usage for the leaf queue - if (getLiveContainers().size() == 0 && !getUnmanagedAM()) { - if (!getQueue().canRunAppAM(getAMResource())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping allocation because maxAMShare limit would " + - "be exceeded"); - } - return Resources.none(); - } - } - ResourceRequest rackLocalRequest = getResourceRequest(priority, node.getRackName()); ResourceRequest localRequest = getResourceRequest(priority, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index c49a3231d9..04dbd2f57f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -124,8 +124,9 @@ public boolean removeApp(FSAppAttempt app) { writeLock.unlock(); } - // Update AM resource usage if needed - if (runnable && app.isAmRunning() && app.getAMResource() != null) { + // Update AM resource usage if needed. If isAMRunning is true, we're not + // running an unmanaged AM. + if (runnable && app.isAmRunning()) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 04c7f70e6f..a6c5416c64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -901,12 +901,6 @@ clusterResource, minimumAllocation, getMaximumResourceCapability(), // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); - // Set amResource for this app - if (!application.getUnmanagedAM() && ask.size() == 1 - && application.getLiveContainers().isEmpty()) { - application.setAMResource(ask.get(0).getCapability()); - } - // Release containers releaseContainers(release, application); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ff215cdfbe..b5bfb8c614 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -3548,8 +3549,8 @@ public void testQueueMaxAMShare() throws Exception { FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application3's AM requests 1024 MB memory", - 1024, app3.getAMResource().getMemory()); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); assertEquals("Application3's AM should not be running", 0, app3.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3574,6 +3575,8 @@ public void testQueueMaxAMShare() throws Exception { 0, app1.getLiveContainers().size()); assertEquals("Application3's AM should be running", 1, app3.getLiveContainers().size()); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3584,8 +3587,8 @@ public void testQueueMaxAMShare() throws Exception { FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application4's AM requests 2048 MB memory", - 2048, app4.getAMResource().getMemory()); + assertEquals("Application4's AM resource shouldn't be updated", + 0, app4.getAMResource().getMemory()); assertEquals("Application4's AM should not be running", 0, app4.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3598,8 +3601,8 @@ public void testQueueMaxAMShare() throws Exception { FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application5's AM requests 2048 MB memory", - 2048, app5.getAMResource().getMemory()); + assertEquals("Application5's AM resource shouldn't be updated", + 0, app5.getAMResource().getMemory()); assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3631,6 +3634,33 @@ public void testQueueMaxAMShare() throws Exception { 0, app3.getLiveContainers().size()); assertEquals("Application5's AM should be running", 1, app5.getLiveContainers().size()); + assertEquals("Application5's AM requests 2048 MB memory", + 2048, app5.getAMResource().getMemory()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // request non-AM container for app5 + createSchedulingRequestExistingApplication(1024, 1, attId5); + assertEquals("Application5's AM should have 1 container", + 1, app5.getLiveContainers().size()); + // complete AM container before non-AM container is allocated. + // spark application hit this situation. + RMContainer amContainer5 = (RMContainer)app5.getLiveContainers().toArray()[0]; + ContainerExpiredSchedulerEvent containerExpired = + new ContainerExpiredSchedulerEvent(amContainer5.getContainerId()); + scheduler.handle(containerExpired); + assertEquals("Application5's AM should have 0 container", + 0, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + scheduler.update(); + scheduler.handle(updateEvent); + // non-AM container should be allocated + // check non-AM container allocation is not rejected + // due to queue MaxAMShare limitation. + assertEquals("Application5 should have 1 container", + 1, app5.getLiveContainers().size()); + // check non-AM container allocation won't affect queue AmResourceUsage assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3643,8 +3673,8 @@ public void testQueueMaxAMShare() throws Exception { scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", 0, app6.getLiveContainers().size()); - assertEquals("Application6's AM requests 2048 MB memory", - 2048, app6.getAMResource().getMemory()); + assertEquals("Application6's AM resource shouldn't be updated", + 0, app6.getAMResource().getMemory()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3748,8 +3778,8 @@ public void testQueueMaxAMShareDefault() throws Exception { FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application2's AM requests 1024 MB memory", - 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM resource shouldn't be updated", + 0, app2.getAMResource().getMemory()); assertEquals("Application2's AM should not be running", 0, app2.getLiveContainers().size()); assertEquals("Queue2's AM resource usage should be 0 MB memory",