diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 7e668b4a68..9b70e5392a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -184,7 +184,7 @@ public ResourceCalculator getResourceCalculator() { } @Override - public void editSchedule() { + public synchronized void editSchedule() { CSQueue root = scheduler.getRootQueue(); Resource clusterResources = Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); @@ -192,7 +192,8 @@ public void editSchedule() { @SuppressWarnings("unchecked") private void preemptOrkillSelectedContainerAfterWait( - Map> selectedCandidates) { + Map> selectedCandidates, + long currentTime) { // preempt (or kill) the selected containers for (Map.Entry> e : selectedCandidates .entrySet()) { @@ -204,8 +205,8 @@ private void preemptOrkillSelectedContainerAfterWait( for (RMContainer container : e.getValue()) { // if we tried to preempt this for more than maxWaitTime if (preemptionCandidates.get(container) != null - && preemptionCandidates.get(container) + maxWaitTime < clock - .getTime()) { + && preemptionCandidates.get(container) + + maxWaitTime <= currentTime) { // kill it rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, @@ -221,7 +222,7 @@ private void preemptOrkillSelectedContainerAfterWait( rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preemptionCandidates.put(container, clock.getTime()); + preemptionCandidates.put(container, currentTime); } } } @@ -243,13 +244,15 @@ private void syncKillableContainersFromScheduler() { } } - private void cleanupStaledPreemptionCandidates() { + private void cleanupStaledPreemptionCandidates(long currentTime) { // Keep the preemptionCandidates list clean for (Iterator i = preemptionCandidates.keySet().iterator(); i.hasNext(); ) { RMContainer id = i.next(); // garbage collect containers that are irrelevant for preemption - if (preemptionCandidates.get(id) + 2 * maxWaitTime < clock.getTime()) { + // And avoid preempt selected containers for *this execution* + // or within 1 ms + if (preemptionCandidates.get(id) + 2 * maxWaitTime < currentTime) { i.remove(); } } @@ -335,11 +338,13 @@ private void containerBasedPreemptOrKill(CSQueue root, // containers. The bottom line is, we shouldn't preempt a queue which is already // below its guaranteed resource. + long currentTime = clock.getTime(); + // preempt (or kill) the selected containers - preemptOrkillSelectedContainerAfterWait(toPreempt); + preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime); // cleanup staled preemption candidates - cleanupStaledPreemptionCandidates(); + cleanupStaledPreemptionCandidates(currentTime); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java index 216ebabbed..1a0adc65d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; @@ -56,9 +55,6 @@ import java.util.Map; import java.util.Set; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestCapacitySchedulerPreemption { private static final Log LOG = LogFactory.getLog( TestCapacitySchedulerPreemption.class); @@ -69,8 +65,6 @@ public class TestCapacitySchedulerPreemption { RMNodeLabelsManager mgr; - Clock clock; - @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -84,6 +78,8 @@ public void setUp() throws Exception { // Set preemption related configurations conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 0); + conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + 60000L); conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, true); conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, @@ -93,8 +89,6 @@ public void setUp() throws Exception { 1.0f); mgr = new NullRMNodeLabelsManager(); mgr.init(this.conf); - clock = mock(Clock.class); - when(clock.getTime()).thenReturn(0L); } private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {