YARN-1287. Consolidate MockClocks. (Sebastian Wong and Anubhav Dhoot via kasha)

This commit is contained in:
Karthik Kambatla 2015-05-09 14:34:54 -07:00
parent df36ad0a08
commit 70fb37cd79
11 changed files with 82 additions and 124 deletions

View File

@ -69,13 +69,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -90,7 +90,7 @@ public class TestRuntimeEstimators {
private static int MAP_TASKS = 200; private static int MAP_TASKS = 200;
private static int REDUCE_TASKS = 150; private static int REDUCE_TASKS = 150;
MockClock clock; ControlledClock clock;
Job myJob; Job myJob;
@ -120,7 +120,7 @@ public class TestRuntimeEstimators {
private void coreTestEstimator private void coreTestEstimator
(TaskRuntimeEstimator testedEstimator, int expectedSpeculations) { (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
estimator = testedEstimator; estimator = testedEstimator;
clock = new MockClock(); clock = new ControlledClock();
dispatcher = new AsyncDispatcher(); dispatcher = new AsyncDispatcher();
myJob = null; myJob = null;
slotsInUse.set(0); slotsInUse.set(0);
@ -129,7 +129,7 @@ public class TestRuntimeEstimators {
successfulSpeculations.set(0); successfulSpeculations.set(0);
taskTimeSavedBySpeculation.set(0); taskTimeSavedBySpeculation.set(0);
clock.advanceTime(1000); clock.tickMsec(1000);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -230,7 +230,7 @@ public class TestRuntimeEstimators {
} }
} }
clock.advanceTime(1000L); clock.tickMsec(1000L);
if (clock.getTime() % 10000L == 0L) { if (clock.getTime() % 10000L == 0L) {
speculator.scanForSpeculations(); speculator.scanForSpeculations();
@ -777,22 +777,6 @@ public String getAssignedContainerMgrAddress() {
} }
} }
static class MockClock implements Clock {
private long currentTime = 0;
public long getTime() {
return currentTime;
}
void setMeasuredTime(long newTime) {
currentTime = newTime;
}
void advanceTime(long increment) {
currentTime += increment;
}
}
class MyAppMaster extends CompositeService { class MyAppMaster extends CompositeService {
final Clock clock; final Clock clock;
public MyAppMaster(Clock clock) { public MyAppMaster(Clock clock) {

View File

@ -218,6 +218,9 @@ Release 2.8.0 - UNRELEASED
YARN-1912. ResourceLocalizer started without any jvm memory control. YARN-1912. ResourceLocalizer started without any jvm memory control.
(Masatake Iwasaki via xgong) (Masatake Iwasaki via xgong)
YARN-1287. Consolidate MockClocks.
(Sebastian Wong and Anubhav Dhoot via kasha)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -21,6 +21,11 @@
public class ControlledClock implements Clock { public class ControlledClock implements Clock {
private long time = -1; private long time = -1;
private final Clock actualClock; private final Clock actualClock;
// Convenience for getting a controlled clock with overridden time
public ControlledClock() {
this(new SystemClock());
setTime(0);
}
public ControlledClock(Clock actualClock) { public ControlledClock(Clock actualClock) {
this.actualClock = actualClock; this.actualClock = actualClock;
} }
@ -30,6 +35,16 @@ public synchronized void setTime(long time) {
public synchronized void reset() { public synchronized void reset() {
time = -1; time = -1;
} }
public synchronized void tickSec(int seconds) {
tickMsec(seconds * 1000L);
}
public synchronized void tickMsec(long millisec) {
if (time == -1) {
throw new IllegalStateException("ControlledClock setTime should be " +
"called before incrementing time");
}
time = time + millisec;
}
@Override @Override
public synchronized long getTime() { public synchronized long getTime() {

View File

@ -22,10 +22,11 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.TestCGroupsHandlerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.TestCGroupsHandlerImpl;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test; import org.junit.Test;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -39,14 +40,6 @@
public class TestCgroupsLCEResourcesHandler { public class TestCgroupsLCEResourcesHandler {
static File cgroupDir = null; static File cgroupDir = null;
static class MockClock implements Clock {
long time;
@Override
public long getTime() {
return time;
}
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
cgroupDir = cgroupDir =
@ -93,8 +86,7 @@ public void testcheckAndDeleteCgroup() throws Exception {
// Verify DeleteCgroup times out if "tasks" file contains data // Verify DeleteCgroup times out if "tasks" file contains data
@Test @Test
public void testDeleteCgroup() throws Exception { public void testDeleteCgroup() throws Exception {
final MockClock clock = new MockClock(); final ControlledClock clock = new ControlledClock();
clock.time = System.currentTimeMillis();
CgroupsLCEResourcesHandler handler = new CgroupsLCEResourcesHandler(); CgroupsLCEResourcesHandler handler = new CgroupsLCEResourcesHandler();
handler.setConf(new YarnConfiguration()); handler.setConf(new YarnConfiguration());
handler.initConfig(); handler.initConfig();
@ -118,8 +110,8 @@ public void run() {
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
//NOP //NOP
} }
clock.time += YarnConfiguration. clock.tickMsec(YarnConfiguration.
DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT; DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT);
} }
}.start(); }.start();
latch.await(); latch.await();

View File

@ -49,18 +49,6 @@
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
public class FairSchedulerTestBase { public class FairSchedulerTestBase {
protected static class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
return time;
}
public void tick(int seconds) {
time = time + seconds * 1000;
}
}
public final static String TEST_DIR = public final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test; import org.junit.Test;
@ -43,18 +43,6 @@ public class TestAllocationFileLoaderService {
final static String ALLOC_FILE = new File(TEST_DIR, final static String ALLOC_FILE = new File(TEST_DIR,
"test-queues").getAbsolutePath(); "test-queues").getAbsolutePath();
private class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
return time;
}
public void tick(long ms) {
time += ms;
}
}
@Test @Test
public void testGetAllocationFileFromClasspath() { public void testGetAllocationFileFromClasspath() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -81,7 +69,8 @@ public void testReload() throws Exception {
out.println("</allocations>"); out.println("</allocations>");
out.close(); out.close();
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
clock.setTime(0);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@ -126,7 +115,7 @@ public void testReload() throws Exception {
out.println("</allocations>"); out.println("</allocations>");
out.close(); out.close();
clock.tick(System.currentTimeMillis() clock.tickMsec(System.currentTimeMillis()
+ AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000); + AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000);
allocLoader.start(); allocLoader.start();

View File

@ -19,20 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.util.SampleStat;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -41,11 +37,10 @@
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
public class TestContinuousScheduling extends FairSchedulerTestBase { public class TestContinuousScheduling extends FairSchedulerTestBase {
private MockClock mockClock; private ControlledClock mockClock;
@Override @Override
public Configuration createConfiguration() { public Configuration createConfiguration() {
@ -59,7 +54,7 @@ public Configuration createConfiguration() {
@Before @Before
public void setup() { public void setup() {
mockClock = new MockClock(); mockClock = new ControlledClock();
conf = createConfiguration(); conf = createConfiguration();
resourceManager = new MockRM(conf); resourceManager = new MockRM(conf);
resourceManager.start(); resourceManager.start();
@ -108,7 +103,7 @@ public void testSchedulingDelay() throws InterruptedException {
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
// Advance time and let continuous scheduling kick in // Advance time and let continuous scheduling kick in
mockClock.tick(1); mockClock.tickSec(1);
while (1024 != app.getCurrentConsumption().getMemory()) { while (1024 != app.getCurrentConsumption().getMemory()) {
Thread.sleep(100); Thread.sleep(100);
} }

View File

@ -31,7 +31,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -39,19 +40,6 @@
public class TestFSAppAttempt extends FairSchedulerTestBase { public class TestFSAppAttempt extends FairSchedulerTestBase {
private class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
return time;
}
public void tick(int seconds) {
time = time + seconds * 1000;
}
}
@Before @Before
public void setup() { public void setup() {
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();
@ -125,7 +113,7 @@ public void testDelaySchedulingForContinuousScheduling()
Priority prio = Mockito.mock(Priority.class); Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1); Mockito.when(prio.getPriority()).thenReturn(1);
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds
@ -143,13 +131,13 @@ public void testDelaySchedulingForContinuousScheduling()
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// after 4 seconds should remain node local // after 4 seconds should remain node local
clock.tick(4); clock.tickSec(4);
assertEquals(NodeType.NODE_LOCAL, assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio, schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// after 6 seconds should switch to rack local // after 6 seconds should switch to rack local
clock.tick(2); clock.tickSec(2);
assertEquals(NodeType.RACK_LOCAL, assertEquals(NodeType.RACK_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio, schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
@ -162,12 +150,12 @@ public void testDelaySchedulingForContinuousScheduling()
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// Now escalate again to rack-local, then to off-switch // Now escalate again to rack-local, then to off-switch
clock.tick(6); clock.tickSec(6);
assertEquals(NodeType.RACK_LOCAL, assertEquals(NodeType.RACK_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio, schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
clock.tick(7); clock.tickSec(7);
assertEquals(NodeType.OFF_SWITCH, assertEquals(NodeType.OFF_SWITCH,
schedulerApp.getAllowedLocalityLevelByTime(prio, schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));

View File

@ -72,7 +72,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -100,6 +99,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -1489,7 +1490,7 @@ public void testChoiceOfPreemptedContainers() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -1587,7 +1588,7 @@ public void testChoiceOfPreemptedContainers() throws Exception {
scheduler.getSchedulerApp(app2).getPreemptionContainers())); scheduler.getSchedulerApp(app2).getPreemptionContainers()));
// Pretend 15 seconds have passed // Pretend 15 seconds have passed
clock.tick(15); clock.tickSec(15);
// Trigger a kill by insisting we want containers back // Trigger a kill by insisting we want containers back
scheduler.preemptResources(Resources.createResource(2 * 1024)); scheduler.preemptResources(Resources.createResource(2 * 1024));
@ -1617,7 +1618,7 @@ public void testChoiceOfPreemptedContainers() throws Exception {
scheduler.preemptResources(Resources.createResource(2 * 1024)); scheduler.preemptResources(Resources.createResource(2 * 1024));
// Pretend 15 seconds have passed // Pretend 15 seconds have passed
clock.tick(15); clock.tickSec(15);
// We should be able to claw back another container from A and B each. // We should be able to claw back another container from A and B each.
// For queueA (fifo), continue preempting from app2. // For queueA (fifo), continue preempting from app2.
@ -1649,7 +1650,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -1702,7 +1703,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
scheduler.update(); scheduler.update();
// Let 11 sec pass // Let 11 sec pass
clock.tick(11); clock.tickSec(11);
scheduler.update(); scheduler.update();
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
@ -1722,7 +1723,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
*/ */
public void testPreemptionDecision() throws Exception { public void testPreemptionDecision() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -1833,7 +1834,7 @@ public void testPreemptionDecision() throws Exception {
Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
// After minSharePreemptionTime has passed, they should want to preempt min // After minSharePreemptionTime has passed, they should want to preempt min
// share. // share.
clock.tick(6); clock.tickSec(6);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -1842,7 +1843,7 @@ public void testPreemptionDecision() throws Exception {
// After fairSharePreemptionTime has passed, they should want to preempt // After fairSharePreemptionTime has passed, they should want to preempt
// fair share. // fair share.
scheduler.update(); scheduler.update();
clock.tick(6); clock.tickSec(6);
assertEquals( assertEquals(
1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -1855,7 +1856,7 @@ public void testPreemptionDecision() throws Exception {
*/ */
public void testPreemptionDecisionWithVariousTimeout() throws Exception { public void testPreemptionDecisionWithVariousTimeout() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -1971,7 +1972,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
// After 5 seconds, queueB1 wants to preempt min share // After 5 seconds, queueB1 wants to preempt min share
scheduler.update(); scheduler.update();
clock.tick(6); clock.tickSec(6);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -1981,7 +1982,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
// After 10 seconds, queueB2 wants to preempt min share // After 10 seconds, queueB2 wants to preempt min share
scheduler.update(); scheduler.update();
clock.tick(5); clock.tickSec(5);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -1991,7 +1992,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
// After 15 seconds, queueC wants to preempt min share // After 15 seconds, queueC wants to preempt min share
scheduler.update(); scheduler.update();
clock.tick(5); clock.tickSec(5);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -2001,7 +2002,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
// After 20 seconds, queueB2 should want to preempt fair share // After 20 seconds, queueB2 should want to preempt fair share
scheduler.update(); scheduler.update();
clock.tick(5); clock.tickSec(5);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -2011,7 +2012,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
// After 25 seconds, queueB1 should want to preempt fair share // After 25 seconds, queueB1 should want to preempt fair share
scheduler.update(); scheduler.update();
clock.tick(5); clock.tickSec(5);
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -2021,7 +2022,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
// After 30 seconds, queueC should want to preempt fair share // After 30 seconds, queueC should want to preempt fair share
scheduler.update(); scheduler.update();
clock.tick(5); clock.tickSec(5);
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
@ -3703,7 +3704,7 @@ public void testQueueMaxAMShareDefault() throws Exception {
@Test @Test
public void testMaxRunningAppsHierarchicalQueues() throws Exception { public void testMaxRunningAppsHierarchicalQueues() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@ -3728,27 +3729,27 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
verifyAppRunnable(attId1, true); verifyAppRunnable(attId1, true);
verifyQueueNumRunnable("queue1.sub1", 1, 0); verifyQueueNumRunnable("queue1.sub1", 1, 0);
clock.tick(10); clock.tickSec(10);
// exceeds no limits // exceeds no limits
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1"); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1");
verifyAppRunnable(attId2, true); verifyAppRunnable(attId2, true);
verifyQueueNumRunnable("queue1.sub3", 1, 0); verifyQueueNumRunnable("queue1.sub3", 1, 0);
clock.tick(10); clock.tickSec(10);
// exceeds no limits // exceeds no limits
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1.sub2", "user1"); ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1.sub2", "user1");
verifyAppRunnable(attId3, true); verifyAppRunnable(attId3, true);
verifyQueueNumRunnable("queue1.sub2", 1, 0); verifyQueueNumRunnable("queue1.sub2", 1, 0);
clock.tick(10); clock.tickSec(10);
// exceeds queue1 limit // exceeds queue1 limit
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1"); ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1");
verifyAppRunnable(attId4, false); verifyAppRunnable(attId4, false);
verifyQueueNumRunnable("queue1.sub2", 1, 1); verifyQueueNumRunnable("queue1.sub2", 1, 1);
clock.tick(10); clock.tickSec(10);
// exceeds sub3 limit // exceeds sub3 limit
ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1.sub3", "user1"); ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1.sub3", "user1");
verifyAppRunnable(attId5, false); verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1.sub3", 1, 1); verifyQueueNumRunnable("queue1.sub3", 1, 1);
clock.tick(10); clock.tickSec(10);
// Even though the app was removed from sub3, the app from sub2 gets to go // Even though the app was removed from sub3, the app from sub2 gets to go
// because it came in first // because it came in first
@ -3923,7 +3924,7 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
public void testRecoverRequestAfterPreemption() throws Exception { public void testRecoverRequestAfterPreemption() throws Exception {
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
MockClock clock = new MockClock(); ControlledClock clock = new ControlledClock();
scheduler.setClock(clock); scheduler.setClock(clock);
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();
@ -3974,7 +3975,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
scheduler.warnOrKillContainer(rmContainer); scheduler.warnOrKillContainer(rmContainer);
// Wait for few clock ticks // Wait for few clock ticks
clock.tick(5); clock.tickSec(5);
// preempt now // preempt now
scheduler.warnOrKillContainer(rmContainer); scheduler.warnOrKillContainer(rmContainer);

View File

@ -26,6 +26,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -43,7 +45,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR, private final static String ALLOC_FILE = new File(TEST_DIR,
TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath(); TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
private MockClock clock; private ControlledClock clock;
private static class StubbedFairScheduler extends FairScheduler { private static class StubbedFairScheduler extends FairScheduler {
public int lastPreemptMemory = -1; public int lastPreemptMemory = -1;
@ -70,7 +72,7 @@ public Configuration createConfiguration() {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
conf = createConfiguration(); conf = createConfiguration();
clock = new MockClock(); clock = new ControlledClock();
} }
@After @After
@ -148,7 +150,7 @@ public void testPreemptionWithFreeResources() throws Exception {
// Verify submitting another request triggers preemption // Verify submitting another request triggers preemption
createSchedulingRequest(1024, "queueB", "user1", 1, 1); createSchedulingRequest(1024, "queueB", "user1", 1, 1);
scheduler.update(); scheduler.update();
clock.tick(6); clock.tickSec(6);
((StubbedFairScheduler) scheduler).resetLastPreemptResources(); ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
scheduler.preemptTasksIfNecessary(); scheduler.preemptTasksIfNecessary();
@ -164,7 +166,7 @@ public void testPreemptionWithFreeResources() throws Exception {
// Verify submitting another request doesn't trigger preemption // Verify submitting another request doesn't trigger preemption
createSchedulingRequest(1024, "queueB", "user1", 1, 1); createSchedulingRequest(1024, "queueB", "user1", 1, 1);
scheduler.update(); scheduler.update();
clock.tick(6); clock.tickSec(6);
((StubbedFairScheduler) scheduler).resetLastPreemptResources(); ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
scheduler.preemptTasksIfNecessary(); scheduler.preemptTasksIfNecessary();
@ -180,7 +182,7 @@ public void testPreemptionWithFreeResources() throws Exception {
// Verify submitting another request triggers preemption // Verify submitting another request triggers preemption
createSchedulingRequest(1024, "queueB", "user1", 1, 1); createSchedulingRequest(1024, "queueB", "user1", 1, 1);
scheduler.update(); scheduler.update();
clock.tick(6); clock.tickSec(6);
((StubbedFairScheduler) scheduler).resetLastPreemptResources(); ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
scheduler.preemptTasksIfNecessary(); scheduler.preemptTasksIfNecessary();

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -40,14 +41,14 @@ public class TestMaxRunningAppsEnforcer {
private Map<String, Integer> userMaxApps; private Map<String, Integer> userMaxApps;
private MaxRunningAppsEnforcer maxAppsEnforcer; private MaxRunningAppsEnforcer maxAppsEnforcer;
private int appNum; private int appNum;
private TestFairScheduler.MockClock clock; private ControlledClock clock;
private RMContext rmContext; private RMContext rmContext;
private FairScheduler scheduler; private FairScheduler scheduler;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
clock = new TestFairScheduler.MockClock(); clock = new ControlledClock();
scheduler = mock(FairScheduler.class); scheduler = mock(FairScheduler.class);
when(scheduler.getConf()).thenReturn( when(scheduler.getConf()).thenReturn(
new FairSchedulerConfiguration(conf)); new FairSchedulerConfiguration(conf));
@ -151,7 +152,7 @@ public void testRemoveEnablingOrderedByStartTime() {
FSAppAttempt app1 = addApp(leaf1, "user"); FSAppAttempt app1 = addApp(leaf1, "user");
addApp(leaf2, "user"); addApp(leaf2, "user");
addApp(leaf2, "user"); addApp(leaf2, "user");
clock.tick(20); clock.tickSec(20);
addApp(leaf1, "user"); addApp(leaf1, "user");
assertEquals(1, leaf1.getNumRunnableApps()); assertEquals(1, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps()); assertEquals(1, leaf2.getNumRunnableApps());