diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 822624f927..7487f714d4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -454,6 +454,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup failure during commitJob. (Junping Du via wangda) + YARN-3920. FairScheduler container reservation on a node should be + configurable to limit it to large containers (adhoot via asuresh) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index cfec9157f8..7af1891e85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -543,10 +543,23 @@ private Resource assignContainer( return container.getResource(); } - // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); + if (isReservable(container)) { + // The desired container won't fit here, so reserve + reserve(request.getPriority(), node, container, reserved); - return FairScheduler.CONTAINER_RESERVED; + return FairScheduler.CONTAINER_RESERVED; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not creating reservation as container " + container.getId() + + " is not reservable"); + } + return Resources.none(); + } + } + + private boolean isReservable(Container container) { + return scheduler.isAtLeastReservationThreshold( + getQueue().getPolicy().getResourceCalculator(), container.getResource()); } private boolean hasNodeOrRackLocalRequests(Priority priority) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3a39799065..a083272eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -194,7 +194,11 @@ public class FairScheduler extends private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; - + + // Container size threshold for making a reservation. + @VisibleForTesting + Resource reservationThreshold; + public FairScheduler() { super(FairScheduler.class.getName()); clock = new SystemClock(); @@ -203,6 +207,12 @@ public FairScheduler() { maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } + public boolean isAtLeastReservationThreshold( + ResourceCalculator resourceCalculator, Resource resource) { + return Resources.greaterThanOrEqual( + resourceCalculator, clusterResource, resource, reservationThreshold); + } + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -1325,6 +1335,7 @@ private void initScheduler(Configuration conf) throws IOException { minimumAllocation = this.conf.getMinimumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation()); incrAllocation = this.conf.getIncrementAllocation(); + updateReservationThreshold(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs(); @@ -1391,6 +1402,14 @@ private void initScheduler(Configuration conf) throws IOException { } } + private void updateReservationThreshold() { + Resource newThreshold = Resources.multiply( + getIncrementResourceCapability(), + this.conf.getReservationThresholdIncrementMultiple()); + + reservationThreshold = newThreshold; + } + private synchronized void startSchedulerThreads() { Preconditions.checkNotNull(updateThread, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e477e6e4e2..892484d55c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.File; -import java.util.ArrayList; -import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,7 +47,17 @@ public class FairSchedulerConfiguration extends Configuration { public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores"; public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1; - + + /** Threshold for container size for making a container reservation as a + * multiple of increment allocation. Only container sizes above this are + * allowed to reserve a node */ + public static final String + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE = + YarnConfiguration.YARN_PREFIX + + "scheduler.reservation-threshold.increment-multiple"; + public static final float + DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f; + private static final String CONF_PREFIX = "yarn.scheduler.fair."; public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file"; @@ -166,7 +174,13 @@ public Resource getIncrementAllocation() { DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES); return Resources.createResource(incrementMemory, incrementCores); } - + + public float getReservationThresholdIncrementMultiple() { + return getFloat( + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE); + } + public float getLocalityThresholdNode() { return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 1c9801d763..dd7ed41669 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -64,6 +64,7 @@ public class FairSchedulerTestBase { protected Configuration conf; protected FairScheduler scheduler; protected ResourceManager resourceManager; + public static final float TEST_RESERVATION_THRESHOLD = 0.09f; // Helper methods public Configuration createConfiguration() { @@ -76,6 +77,11 @@ public Configuration createConfiguration() { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + + conf.setFloat( + FairSchedulerConfiguration + .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + TEST_RESERVATION_THRESHOLD); return conf; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a02cf18b8f..ad54616e6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -710,9 +710,10 @@ public void testSimpleContainerAllocation() throws IOException { scheduler.handle(updateEvent); // Asked for less than increment allocation. - assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + assertEquals( + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); @@ -764,7 +765,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity @@ -939,8 +940,88 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { getResourceUsage().getMemory()); } - + @Test + public void testReservationThresholdGatesReservations() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + // Set threshold to 2 * 1024 ==> 2048 MB & 2 * 1 ==> 2 vcores (test will + // use vcores) + conf.setFloat(FairSchedulerConfiguration. + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + 2f); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(4096, 4), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue 1 requests full capacity of node + createSchedulingRequest(4096, 4, "queue1", "user1", 1, 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + + scheduler.handle(updateEvent); + + // Make sure queue 1 is allocated app capacity + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Now queue 2 requests below threshold + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 has no reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + assertEquals(0, + scheduler.getSchedulerApp(attId).getReservedContainers().size()); + + // Now queue requests CPU above threshold + createSchedulingRequestExistingApplication(1024, 3, 1, attId); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 is waiting with a reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + + // Now another node checks in with capacity + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(1024, 4), 2, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + scheduler.handle(updateEvent2); + + // Make sure this goes to queue 2 + assertEquals(3, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getVirtualCores()); + + // The old reservation should still be there... + assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + // ... but it should disappear when we update the first node. + scheduler.handle(updateEvent); + assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + } @Test public void testEmptyQueueName() throws Exception {