From fbfe86deea5f2aa857cd13fee913b7becee57f93 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Sun, 26 Feb 2017 20:36:33 -0800 Subject: [PATCH] YARN-6172. FSLeafQueue demand update needs to be atomic. (Miklos Szegedi via kasha) --- .../scheduler/fair/FSLeafQueue.java | 21 +++++----------- .../scheduler/fair/TestFSAppStarvation.java | 24 +++++++++++++++---- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index d0e0961010..aad291619c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -331,20 +331,22 @@ Resource getAmResourceUsage() { public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources - demand = Resources.createResource(0); + Resource tmpDemand = Resources.createResource(0); readLock.lock(); try { for (FSAppAttempt sched : runnableApps) { - updateDemandForApp(sched); + sched.updateDemand(); + Resources.addTo(tmpDemand, sched.getDemand()); } for (FSAppAttempt sched : nonRunnableApps) { - updateDemandForApp(sched); + sched.updateDemand(); + Resources.addTo(tmpDemand, sched.getDemand()); } } finally { readLock.unlock(); } // Cap demand to maxShare to limit allocation to maxShare - demand = Resources.componentwiseMin(demand, maxShare); + demand = Resources.componentwiseMin(tmpDemand, maxShare); if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + "; the max is " + maxShare); @@ -352,17 +354,6 @@ public void updateDemand() { + getFairShare()); } } - - private void updateDemandForApp(FSAppAttempt sched) { - sched.updateDemand(); - Resource toAdd = sched.getDemand(); - if (LOG.isDebugEnabled()) { - LOG.debug("Counting resource from " + sched.getName() + " " + toAdd - + "; Total resource demand for " + getName() + " now " - + demand); - } - demand = Resources.add(demand, toAdd); - } @Override public Resource assignContainer(FSSchedulerNode node) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index 2eacc9ee3b..0712b4cda4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -96,6 +96,14 @@ public void testPreemptionDisabled() throws Exception { public void testPreemptionEnabled() throws Exception { setupClusterAndSubmitJobs(); + // Wait for apps to be processed by MockPreemptionThread + for (int i = 0; i < 6000; ++i) { + if (preemptionThread.uniqueAppsAdded() >= 3) { + break; + } + Thread.sleep(10); + } + assertNotNull("FSContext does not have an FSStarvedApps instance", scheduler.getContext().getStarvedApps()); assertEquals("Expecting 3 starved applications, one each for the " @@ -113,8 +121,19 @@ public void testPreemptionEnabled() throws Exception { clock.tickMsec( FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS); scheduler.update(); + + // Wait for apps to be processed by MockPreemptionThread + for (int i = 0; i < 6000; ++i) { + if(preemptionThread.totalAppsAdded() > + preemptionThread.uniqueAppsAdded()) { + break; + } + Thread.sleep(10); + } + assertTrue("Each app is marked as starved exactly once", - preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); + preemptionThread.totalAppsAdded() > + preemptionThread.uniqueAppsAdded()); } /* @@ -154,9 +173,6 @@ private void setupClusterAndSubmitJobs() throws Exception { // Scheduler update to populate starved apps scheduler.update(); - - // Wait for apps to be processed by MockPreemptionThread - Thread.yield(); } /**