YARN-4846. Fix random failures for TestCapacitySchedulerPreemption#testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers. (Bibin A Chundatt via wangda)

This commit is contained in:
Wangda Tan 2016-04-22 11:40:32 -07:00
parent d6402faded
commit 7cb3a3da96
2 changed files with 16 additions and 17 deletions

View File

@ -184,7 +184,7 @@ public ResourceCalculator getResourceCalculator() {
} }
@Override @Override
public void editSchedule() { public synchronized void editSchedule() {
CSQueue root = scheduler.getRootQueue(); CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resources.clone(scheduler.getClusterResource()); Resource clusterResources = Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources); containerBasedPreemptOrKill(root, clusterResources);
@ -192,7 +192,8 @@ public void editSchedule() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void preemptOrkillSelectedContainerAfterWait( private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) { Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
long currentTime) {
// preempt (or kill) the selected containers // preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
.entrySet()) { .entrySet()) {
@ -204,8 +205,8 @@ private void preemptOrkillSelectedContainerAfterWait(
for (RMContainer container : e.getValue()) { for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime // if we tried to preempt this for more than maxWaitTime
if (preemptionCandidates.get(container) != null if (preemptionCandidates.get(container) != null
&& preemptionCandidates.get(container) + maxWaitTime < clock && preemptionCandidates.get(container)
.getTime()) { + maxWaitTime <= currentTime) {
// kill it // kill it
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
@ -221,7 +222,7 @@ private void preemptOrkillSelectedContainerAfterWait(
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
preemptionCandidates.put(container, clock.getTime()); preemptionCandidates.put(container, currentTime);
} }
} }
} }
@ -243,13 +244,15 @@ private void syncKillableContainersFromScheduler() {
} }
} }
private void cleanupStaledPreemptionCandidates() { private void cleanupStaledPreemptionCandidates(long currentTime) {
// Keep the preemptionCandidates list clean // Keep the preemptionCandidates list clean
for (Iterator<RMContainer> i = preemptionCandidates.keySet().iterator(); for (Iterator<RMContainer> i = preemptionCandidates.keySet().iterator();
i.hasNext(); ) { i.hasNext(); ) {
RMContainer id = i.next(); RMContainer id = i.next();
// garbage collect containers that are irrelevant for preemption // garbage collect containers that are irrelevant for preemption
if (preemptionCandidates.get(id) + 2 * maxWaitTime < clock.getTime()) { // And avoid preempt selected containers for *this execution*
// or within 1 ms
if (preemptionCandidates.get(id) + 2 * maxWaitTime < currentTime) {
i.remove(); i.remove();
} }
} }
@ -335,11 +338,13 @@ private void containerBasedPreemptOrKill(CSQueue root,
// containers. The bottom line is, we shouldn't preempt a queue which is already // containers. The bottom line is, we shouldn't preempt a queue which is already
// below its guaranteed resource. // below its guaranteed resource.
long currentTime = clock.getTime();
// preempt (or kill) the selected containers // preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreempt); preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime);
// cleanup staled preemption candidates // cleanup staled preemption candidates
cleanupStaledPreemptionCandidates(); cleanupStaledPreemptionCandidates(currentTime);
} }
@Override @Override

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -56,9 +55,6 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerPreemption { public class TestCapacitySchedulerPreemption {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
TestCapacitySchedulerPreemption.class); TestCapacitySchedulerPreemption.class);
@ -69,8 +65,6 @@ public class TestCapacitySchedulerPreemption {
RMNodeLabelsManager mgr; RMNodeLabelsManager mgr;
Clock clock;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
@ -84,6 +78,8 @@ public void setUp() throws Exception {
// Set preemption related configurations // Set preemption related configurations
conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
0); 0);
conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
60000L);
conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
true); true);
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
@ -93,8 +89,6 @@ public void setUp() throws Exception {
1.0f); 1.0f);
mgr = new NullRMNodeLabelsManager(); mgr = new NullRMNodeLabelsManager();
mgr.init(this.conf); mgr.init(this.conf);
clock = mock(Clock.class);
when(clock.getTime()).thenReturn(0L);
} }
private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {