diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 41ef404be6..cd0e472842 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -48,6 +48,7 @@ import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -121,6 +122,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -131,14 +133,16 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; public class TestYarnClient { - @Test - public void test() { - // More to come later. + @Before + public void setup() { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 925464073b..4896ab0e76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -125,8 +126,20 @@ public abstract class AbstractYarnScheduler protected SchedulerHealth schedulerHealth = new SchedulerHealth(); protected volatile long lastNodeUpdateTime; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + private volatile Clock clock; + /** + * To enable the update thread, subclasses should set updateInterval to a + * positive value during {@link #serviceInit(Configuration)}. + */ + protected long updateInterval = -1L; + @VisibleForTesting + Thread updateThread; + private final Object updateThreadMonitor = new Object(); + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -187,9 +200,35 @@ public void serviceInit(Configuration conf) throws Exception { autoUpdateContainers = conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); + + if (updateInterval > 0) { + updateThread = new UpdateThread(); + updateThread.setName("SchedulerUpdateThread"); + updateThread.setUncaughtExceptionHandler( + new RMCriticalThreadUncaughtExceptionHandler(rmContext)); + updateThread.setDaemon(true); + } + super.serviceInit(conf); } + @Override + protected void serviceStart() throws Exception { + if (updateThread != null) { + updateThread.start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (updateThread != null) { + updateThread.interrupt(); + updateThread.join(THREAD_JOIN_TIMEOUT_MS); + } + super.serviceStop(); + } + @VisibleForTesting public ClusterNodeTracker getNodeTracker() { return nodeTracker; @@ -1353,4 +1392,51 @@ public long checkAndGetApplicationLifetime(String queueName, long lifetime) { public long getMaximumApplicationLifetime(String queueName) { return -1; } + + /** + * Update internal state of the scheduler. This can be useful for scheduler + * implementations that maintain some state that needs to be periodically + * updated; for example, metrics or queue resources. It will be called by the + * {@link UpdateThread} every {@link #updateInterval}. By default, it will + * not run; subclasses should set {@link #updateInterval} to a + * positive value during {@link #serviceInit(Configuration)} if they want to + * enable the thread. + */ + @VisibleForTesting + public void update() { + // do nothing by default + } + + /** + * Thread which calls {@link #update()} every + * updateInterval milliseconds. + */ + private class UpdateThread extends Thread { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + synchronized (updateThreadMonitor) { + updateThreadMonitor.wait(updateInterval); + } + update(); + } catch (InterruptedException ie) { + LOG.warn("Scheduler UpdateThread interrupted. Exiting."); + return; + } catch (Exception e) { + LOG.error("Exception in scheduler UpdateThread", e); + } + } + } + } + + /** + * Allows {@link UpdateThread} to start processing without waiting till + * {@link #updateInterval}. + */ + protected void triggerUpdate() { + synchronized (updateThreadMonitor) { + updateThreadMonitor.notify(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ebc2d3968c..a5afa96a13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -151,24 +151,14 @@ public class FairScheduler extends // reserved public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); - // How often fair shares are re-calculated (ms) - protected long updateInterval; private final int UPDATE_DEBUG_FREQUENCY = 25; private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; - @VisibleForTesting - Thread updateThread; - - private final Object updateThreadMonitor = new Object(); - @VisibleForTesting Thread schedulingThread; Thread preemptionThread; - // timeout to join when we stop this service - protected final long THREAD_JOIN_TIMEOUT_MS = 1000; - // Aggregate metrics FSQueueMetrics rootMetrics; FSOpDurations fsOpDurations; @@ -292,40 +282,6 @@ public QueueManager getQueueManager() { return queueMgr; } - // Allows UpdateThread to start processing without waiting till updateInterval - void triggerUpdate() { - synchronized (updateThreadMonitor) { - updateThreadMonitor.notify(); - } - } - - /** - * Thread which calls {@link FairScheduler#update()} every - * updateInterval milliseconds. - */ - private class UpdateThread extends Thread { - - @Override - public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - synchronized (updateThreadMonitor) { - updateThreadMonitor.wait(updateInterval); - } - long start = getClock().getTime(); - update(); - long duration = getClock().getTime() - start; - fsOpDurations.addUpdateThreadRunDuration(duration); - } catch (InterruptedException ie) { - LOG.warn("Update thread interrupted. Exiting."); - return; - } catch (Exception e) { - LOG.error("Exception in fair scheduler UpdateThread", e); - } - } - } - } - /** * Thread which attempts scheduling resources continuously, * asynchronous to the node heartbeats. @@ -367,7 +323,10 @@ private void dumpSchedulerState() { * required resources per job. */ @VisibleForTesting + @Override public void update() { + // Storing start time for fsOpDurations + long start = getClock().getTime(); FSQueue rootQueue = queueMgr.getRootQueue(); // Update demands and fairshares @@ -402,6 +361,7 @@ public void update() { } finally { readLock.unlock(); } + fsOpDurations.addUpdateThreadRunDuration(getClock().getTime() - start); } public RMContainerTokenSecretManager @@ -1339,12 +1299,6 @@ private void initScheduler(Configuration conf) throws IOException { throw new IOException("Failed to start FairScheduler", e); } - updateThread = new UpdateThread(); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setUncaughtExceptionHandler( - new RMCriticalThreadUncaughtExceptionHandler(rmContext)); - updateThread.setDaemon(true); - if (continuousSchedulingEnabled) { // start continuous scheduling thread schedulingThread = new ContinuousSchedulingThread(); @@ -1391,9 +1345,7 @@ private void updateReservationThreshold() { private void startSchedulerThreads() { try { writeLock.lock(); - Preconditions.checkNotNull(updateThread, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); - updateThread.start(); if (continuousSchedulingEnabled) { Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); @@ -1424,10 +1376,6 @@ public void serviceStart() throws Exception { public void serviceStop() throws Exception { try { writeLock.lock(); - if (updateThread != null) { - updateThread.interrupt(); - updateThread.join(THREAD_JOIN_TIMEOUT_MS); - } if (continuousSchedulingEnabled) { if (schedulingThread != null) { schedulingThread.interrupt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 37dc7cc6e9..b772e80fcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -817,6 +817,8 @@ PrivilegedExceptionAction setClientReq( RMAppAttemptState.SCHEDULED); } + ((AbstractYarnScheduler)getResourceScheduler()).update(); + return rmApp; } @@ -940,6 +942,7 @@ public FailApplicationAttemptResponse failApplicationAttempt( public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + ((AbstractYarnScheduler)scheduler).update(); waitForState(appAttemptId, RMAppAttemptState.ALLOCATED); //create and set AMRMToken Token amrmToken = @@ -1164,6 +1167,7 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) RMAppAttempt attempt = waitForAttemptScheduled(app, rm); LOG.info("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); + ((AbstractYarnScheduler)rm.getResourceScheduler()).update(); rm.drainEventsImplicitly(); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -1179,6 +1183,7 @@ public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); LOG.info("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); + ((AbstractYarnScheduler)rm.getResourceScheduler()).update(); rm.drainEventsImplicitly(); MockAM am = new MockAM(rm.getRMContext(), rm.masterService, attempt.getAppAttemptId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java index 00809f04f8..289ff1cebe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -18,53 +18,74 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; -import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +@RunWith(Parameterized.class) public abstract class ParameterizedSchedulerTestBase { protected final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); private final static String FS_ALLOC_FILE = new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath(); - private SchedulerType schedulerType; - private YarnConfiguration conf = null; - private AbstractYarnScheduler scheduler = null; - public enum SchedulerType { CAPACITY, FAIR } + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.stream(SchedulerType.values()).map( + type -> new Object[]{type}).collect(Collectors.toList()); + } + + private SchedulerType schedulerType; + private YarnConfiguration conf = null; + private AbstractYarnScheduler scheduler = null; + public YarnConfiguration getConf() { return conf; } - @Before - public void configureScheduler() throws IOException, ClassNotFoundException { + // Due to parameterization, this gets called before each test method + public ParameterizedSchedulerTestBase(SchedulerType type) + throws IOException { conf = new YarnConfiguration(); - Class schedulerClass = - conf.getClass(YarnConfiguration.RM_SCHEDULER, - Class.forName(YarnConfiguration.DEFAULT_RM_SCHEDULER)); + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); - if (schedulerClass == FairScheduler.class) { - schedulerType = SchedulerType.FAIR; - configureFairScheduler(conf); - scheduler = new FairScheduler(); - } else if (schedulerClass == CapacityScheduler.class) { - schedulerType = SchedulerType.CAPACITY; - scheduler = new CapacityScheduler(); - ((CapacityScheduler)scheduler).setConf(conf); + schedulerType = type; + switch (schedulerType) { + case FAIR: + configureFairScheduler(conf); + scheduler = new FairScheduler(); + conf.set(YarnConfiguration.RM_SCHEDULER, + FairScheduler.class.getName()); + break; + case CAPACITY: + scheduler = new CapacityScheduler(); + ((CapacityScheduler)scheduler).setConf(conf); + conf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + break; + default: + throw new IllegalArgumentException("Invalid type: " + type); } } @@ -85,7 +106,6 @@ private void configureFairScheduler(YarnConfiguration conf) throws IOException { out.println(""); out.close(); - conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); } @@ -97,7 +117,8 @@ public SchedulerType getSchedulerType() { /** * Return a scheduler configured by {@code YarnConfiguration.RM_SCHEDULER} * - * The scheduler is configured by {@link #configureScheduler()}. + * The scheduler is configured by + * {@link #ParameterizedSchedulerTestBase(SchedulerType)}. * Client test code can obtain the scheduler with this getter method. * Schedulers supported by this class are {@link FairScheduler} or * {@link CapacityScheduler}. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 4d8b20d69f..4ac4fc306b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.junit.After; @@ -95,6 +96,7 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); + ((AbstractYarnScheduler)rm.getResourceScheduler()).update(); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); am.registerAppAttempt(); rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index 526621004c..e7d666ad8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -41,11 +42,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -54,6 +58,12 @@ */ public class TestNodeBlacklistingOnAMFailures { + @Before + public void setup() { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); + } + @Test(timeout = 100000) public void testNodeBlacklistingOnAMFailure() throws Exception { @@ -361,6 +371,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { // Now the AM container should be allocated RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); + ((AbstractYarnScheduler)rm.getResourceScheduler()).update(); rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -388,6 +399,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { .println("New AppAttempt launched " + attempt.getAppAttemptId()); node.nodeHeartbeat(true); + ((AbstractYarnScheduler)rm.getResourceScheduler()).update(); rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 39313d06bd..f912f68e91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -21,11 +21,13 @@ import com.google.common.base.Supplier; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -89,6 +91,10 @@ public class TestRM extends ParameterizedSchedulerTestBase { private YarnConfiguration conf; + public TestRM(SchedulerType type) throws IOException { + super(type); + } + @Before public void setup() { conf = getConf(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 512c14a84d..588f16deef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -41,6 +41,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.Groups; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX; @@ -109,6 +111,9 @@ public class TestRMAdminService { @Before public void setup() throws IOException { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); + configuration = new YarnConfiguration(); configuration.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getCanonicalName()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index c9dcaef3f5..0346f4fbaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -147,6 +147,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { private static InetSocketAddress rmAddr; private List rms = new ArrayList(); + public TestRMRestart(SchedulerType type) throws IOException { + super(type); + } + @Before public void setup() throws IOException { conf = getConf(); @@ -384,6 +388,7 @@ public void testRMRestart() throws Exception { // assert app1 attempt is saved attempt1 = loadedApp1.getCurrentAppAttempt(); attemptId1 = attempt1.getAppAttemptId(); + ((AbstractYarnScheduler)rm2.getResourceScheduler()).update(); rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); appState = rmAppState.get(loadedApp1.getApplicationId()); attemptState = appState.getAttempt(attemptId1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 2c37f44e41..a13cae719d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -107,6 +107,10 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase MockRM rm1 = null; MockRM rm2 = null; + public TestWorkPreservingRMRestart(SchedulerType type) throws IOException { + super(type); + } + @Before public void setup() throws UnknownHostException { Logger rootLogger = LogManager.getRootLogger(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java index a7b7e32ff9..6c4fac8e6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java @@ -53,6 +53,10 @@ public class TestReservationSystem extends private Configuration conf; private RMContext mockRMContext; + public TestReservationSystem(SchedulerType type) throws IOException { + super(type); + } + @Before public void setUp() throws IOException { scheduler = initializeScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 6a7325c25c..3c4e6b424d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -70,6 +71,10 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { private Dispatcher dispatcher; private RMContextImpl context; + public TestNMReconnect(SchedulerType type) throws IOException { + super(type); + } + private class TestRMNodeEventDispatcher implements EventHandler { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 60b9e4bc95..979e68a25d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -84,13 +84,16 @@ @SuppressWarnings("unchecked") public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { + public TestAbstractYarnScheduler(SchedulerType type) throws IOException { + super(type); + } + @Test public void testMaximimumAllocationMemory() throws Exception { final int node1MaxMemory = 15 * 1024; final int node2MaxMemory = 5 * 1024; final int node3MaxMemory = 6 * 1024; final int configuredMaxMemory = 10 * 1024; - configureScheduler(); YarnConfiguration conf = getConf(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, configuredMaxMemory); @@ -177,7 +180,6 @@ public void testMaximimumAllocationVCores() throws Exception { final int node2MaxVCores = 5; final int node3MaxVCores = 6; final int configuredMaxVCores = 10; - configureScheduler(); YarnConfiguration conf = getConf(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, configuredMaxVCores); @@ -381,7 +383,6 @@ public void testMaxAllocationAfterUpdateNodeResource() throws IOException { @Test(timeout = 10000) public void testReleasedContainerIfAppAttemptisNull() throws Exception { YarnConfiguration conf=getConf(); - conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); MockRM rm1 = new MockRM(conf); try { rm1.start(); @@ -425,7 +426,6 @@ public void testReleasedContainerIfAppAttemptisNull() throws Exception { @Test(timeout=60000) public void testContainerReleasedByNode() throws Exception { System.out.println("Starting testContainerReleasedByNode"); - configureScheduler(); YarnConfiguration conf = getConf(); MockRM rm1 = new MockRM(conf); try { @@ -538,7 +538,6 @@ public void testContainerReleasedByNode() throws Exception { @Test(timeout = 60000) public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() throws Exception { - configureScheduler(); YarnConfiguration conf = getConf(); MockRM rm1 = new MockRM(conf); try { @@ -627,7 +626,6 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() public void testResourceRequestRecoveryToTheRightAppAttempt() throws Exception { - configureScheduler(); YarnConfiguration conf = getConf(); MockRM rm = new MockRM(conf); try { @@ -798,7 +796,6 @@ private ResourceTrackerService getPrivateResourceTrackerService( */ @Test(timeout = 60000) public void testNodemanagerReconnect() throws Exception { - configureScheduler(); Configuration conf = getConf(); MockRM rm = new MockRM(conf); try { @@ -846,4 +843,35 @@ public void testNodemanagerReconnect() throws Exception { rm.stop(); } } + + @Test(timeout = 10000) + public void testUpdateThreadLifeCycle() throws Exception { + MockRM rm = new MockRM(getConf()); + try { + rm.start(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + if (getSchedulerType().equals(SchedulerType.FAIR)) { + Thread updateThread = scheduler.updateThread; + Assert.assertTrue(updateThread.isAlive()); + scheduler.stop(); + + int numRetries = 100; + while (numRetries-- > 0 && updateThread.isAlive()) { + Thread.sleep(50); + } + + Assert.assertNotEquals("The Update thread is still alive", 0, numRetries); + } else if (getSchedulerType().equals(SchedulerType.CAPACITY)) { + Assert.assertNull("updateThread shouldn't have been created", + scheduler.updateThread); + } else { + Assert.fail("Unhandled SchedulerType, " + getSchedulerType() + + ", please update this unit test."); + } + } finally { + rm.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java index e60fd6f889..8e98332113 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.List; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -46,9 +48,23 @@ public class TestSchedulingWithAllocationRequestId LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class); private static final int GB = 1024; - @Test + public TestSchedulingWithAllocationRequestId(SchedulerType type) throws IOException { + super(type); + } + + @Override + public YarnConfiguration getConf() { + YarnConfiguration conf = super.getConf(); + if (getSchedulerType().equals(SchedulerType.FAIR)) { + // Some tests here rely on being able to assign multiple containers with + // a single heartbeat + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + } + return conf; + } + + @Test (timeout = 10000) public void testMultipleAllocationRequestIds() throws Exception { - configureScheduler(); YarnConfiguration conf = getConf(); MockRM rm = new MockRM(conf); try { @@ -63,32 +79,20 @@ public void testMultipleAllocationRequestIds() throws Exception { MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - // add request for containers with id 10 & 20 - am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L); - AllocateResponse allocResponse = am1.schedule(); // send the request - am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L); - allocResponse = am1.schedule(); // send the request + // send requests for containers with id 10 & 20 + am1.allocate(am1.createReq( + new String[] {"127.0.0.1"}, 2 * GB, 1, 1, 10L), null); + am1.allocate(am1.createReq( + new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null); // check if request id 10 is satisfied - nm1.nodeHeartbeat(true); - allocResponse = am1.schedule(); // send the request - while (allocResponse.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am1.schedule(); - } + AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1); List allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(1, allocated.size()); checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10); // check now if request id 20 is satisfied - nm2.nodeHeartbeat(true); - while (allocResponse.getAllocatedContainers().size() < 2) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am1.schedule(); - } - + allocResponse = waitForAllocResponse(rm, am1, nm2, 2); allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(2, allocated.size()); for (Container container : allocated) { @@ -101,9 +105,8 @@ public void testMultipleAllocationRequestIds() throws Exception { } } - @Test + @Test (timeout = 10000) public void testMultipleAllocationRequestDiffPriority() throws Exception { - configureScheduler(); YarnConfiguration conf = getConf(); MockRM rm = new MockRM(conf); try { @@ -118,20 +121,14 @@ public void testMultipleAllocationRequestDiffPriority() throws Exception { MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - // add request for containers with id 10 & 20 - am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L); - AllocateResponse allocResponse = am1.schedule(); // send the request - am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L); - allocResponse = am1.schedule(); // send the request + // send requests for containers with id 10 & 20 + am1.allocate(am1.createReq( + new String[] {"127.0.0.1"}, 2 * GB, 2, 1, 10L), null); + am1.allocate(am1.createReq( + new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null); // check if request id 20 is satisfied first - nm2.nodeHeartbeat(true); - while (allocResponse.getAllocatedContainers().size() < 2) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am1.schedule(); - } - + AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm2, 2); List allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(2, allocated.size()); for (Container container : allocated) { @@ -139,13 +136,7 @@ public void testMultipleAllocationRequestDiffPriority() throws Exception { } // check now if request id 10 is satisfied - nm1.nodeHeartbeat(true); - allocResponse = am1.schedule(); // send the request - while (allocResponse.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am1.schedule(); - } + allocResponse = waitForAllocResponse(rm, am1, nm1, 1); allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(1, allocated.size()); checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10); @@ -164,9 +155,8 @@ private void checkAllocatedContainer(Container allocated, int memory, allocated.getAllocationRequestId()); } - @Test + @Test (timeout = 10000) public void testMultipleAppsWithAllocationReqId() throws Exception { - configureScheduler(); YarnConfiguration conf = getConf(); MockRM rm = new MockRM(conf); try { @@ -190,19 +180,11 @@ public void testMultipleAppsWithAllocationReqId() throws Exception { // Submit app1 RR with allocationReqId = 5 int numContainers = 1; - am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers, - 5L); - AllocateResponse allocResponse = am1.schedule(); - - // wait for containers to be allocated. - nm1.nodeHeartbeat(true); - allocResponse = am1.schedule(); // send the request - while (allocResponse.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am1.schedule(); - } + am1.allocate(am1.createReq( + new String[] {host0, host1}, 1 * GB, 1, numContainers, 5L), null); + // wait for container to be allocated. + AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1); List allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(1, allocated.size()); checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L); @@ -212,55 +194,31 @@ public void testMultipleAppsWithAllocationReqId() throws Exception { MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); // Submit app2 RR with allocationReqId = 5 - am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers, - 5L); - am2.schedule(); - - // wait for containers to be allocated. - nm2.nodeHeartbeat(true); - allocResponse = am2.schedule(); // send the request - while (allocResponse.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am2.schedule(); - } + am2.allocate(am1.createReq( + new String[] {host0, host1}, 2 * GB, 1, numContainers, 5L), null); + // wait for container to be allocated. + allocResponse = waitForAllocResponse(rm, am2, nm2, 1); allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(1, allocated.size()); checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L); // Now submit app2 RR with allocationReqId = 10 - am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers, - 10L); - am2.schedule(); - - // wait for containers to be allocated. - nm1.nodeHeartbeat(true); - allocResponse = am2.schedule(); // send the request - while (allocResponse.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am2.schedule(); - } + am2.allocate(am1.createReq( + new String[] {host0, host1}, 3 * GB, 1, numContainers, 10L), null); + // wait for container to be allocated. + allocResponse = waitForAllocResponse(rm, am2, nm1, 1); allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(1, allocated.size()); checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L); // Now submit app1 RR with allocationReqId = 10 - am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers, - 10L); - am1.schedule(); - - // wait for containers to be allocated. - nm2.nodeHeartbeat(true); - allocResponse = am1.schedule(); // send the request - while (allocResponse.getAllocatedContainers().size() < 1) { - LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(100); - allocResponse = am1.schedule(); - } + am1.allocate(am1.createReq( + new String[] {host0, host1}, 4 * GB, 1, numContainers, 10L), null); + // wait for container to be allocated. + allocResponse = waitForAllocResponse(rm, am1, nm2, 1); allocated = allocResponse.getAllocatedContainers(); Assert.assertEquals(1, allocated.size()); checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L); @@ -271,4 +229,17 @@ public void testMultipleAppsWithAllocationReqId() throws Exception { } } + private AllocateResponse waitForAllocResponse(MockRM rm, MockAM am, MockNM nm, + int size) throws Exception { + AllocateResponse allocResponse = am.doHeartbeat(); + while (allocResponse.getAllocatedContainers().size() < size) { + LOG.info("Waiting for containers to be created for app..."); + nm.nodeHeartbeat(true); + ((AbstractYarnScheduler) rm.getResourceScheduler()).update(); + Thread.sleep(100); + allocResponse = am.doHeartbeat(); + } + return allocResponse; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 9efa83d99f..854a65c283 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -285,24 +285,19 @@ public void testInterruptedException() } @Test - public void testThreadLifeCycle() throws InterruptedException { + public void testSchedulerThreadLifeCycle() throws InterruptedException { scheduler.start(); - Thread updateThread = scheduler.updateThread; Thread schedulingThread = scheduler.schedulingThread; - - assertTrue(updateThread.isAlive()); assertTrue(schedulingThread.isAlive()); - scheduler.stop(); int numRetries = 100; - while (numRetries-- > 0 && - (updateThread.isAlive() || schedulingThread.isAlive())) { + while (numRetries-- > 0 && schedulingThread.isAlive()) { Thread.sleep(50); } - assertNotEquals("One of the threads is still alive", 0, numRetries); + assertNotEquals("The Scheduling thread is still alive", 0, numRetries); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index 37fc3b3474..683173af70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -21,6 +21,7 @@ import java.util.*; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; @@ -148,7 +149,10 @@ public void testSizeBasedWeightNotAffectAppActivation() throws Exception { // Define top-level queues String queuePath = CapacitySchedulerConfiguration.ROOT + ".default"; - csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); + csConf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getCanonicalName()); + csConf.setOrderingPolicy(queuePath, + CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); csConf.setOrderingPolicyParameter(queuePath, FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true"); csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index d4e7727ad5..a0f4007cf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -86,6 +86,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { private YarnConfiguration conf; + public TestClientToAMTokens(SchedulerType type) throws IOException { + super(type); + } + @Before public void setup() { conf = getConf();
updateInterval
The scheduler is configured by {@link #configureScheduler()}. + *
The scheduler is configured by + * {@link #ParameterizedSchedulerTestBase(SchedulerType)}. * Client test code can obtain the scheduler with this getter method. * Schedulers supported by this class are {@link FairScheduler} or * {@link CapacityScheduler}.