From 3114d4731dcca7cb6c16aaa7c7a6550b7dd7dccb Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 21 Nov 2014 10:32:28 -0800 Subject: [PATCH] YARN-2604. Scheduler should consider max-allocation-* in conjunction with the largest node. (Robert Kanter via kasha) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/AbstractYarnScheduler.java | 104 ++++++++- .../scheduler/capacity/CapacityScheduler.java | 15 +- .../scheduler/fair/FairScheduler.java | 10 +- .../scheduler/fifo/FifoScheduler.java | 16 +- .../resourcemanager/TestFifoScheduler.java | 1 + .../scheduler/TestAbstractYarnScheduler.java | 213 ++++++++++++++++++ .../capacity/TestContainerAllocation.java | 4 +- .../scheduler/fair/TestFairScheduler.java | 31 +-- 9 files changed, 348 insertions(+), 49 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6d86d7583a..5c68254d73 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -84,6 +84,9 @@ Release 2.7.0 - UNRELEASED YARN-2375. Allow enabling/disabling timeline server per framework. (Mit Desai via jeagles) + YARN-2604. Scheduler should consider max-allocation-* in conjunction + with the largest node. (Robert Kanter via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8e8d6278e6..bf720ae51d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -77,7 +78,14 @@ public abstract class AbstractYarnScheduler protected Resource clusterResource = Resource.newInstance(0, 0); protected Resource minimumAllocation; - protected Resource maximumAllocation; + private Resource maximumAllocation; + private Resource configuredMaximumAllocation; + private int maxNodeMemory = -1; + private int maxNodeVCores = -1; + private ReentrantReadWriteLock maximumAllocationLock = + new ReentrantReadWriteLock(); + private boolean useConfiguredMaximumAllocationOnly = true; + private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; protected Map> applications; @@ -102,6 +110,9 @@ public void serviceInit(Configuration conf) throws Exception { nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + configuredMaximumAllocationWaitTime = + conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); createReleaseCache(); super.serviceInit(conf); } @@ -145,7 +156,37 @@ public Resource getMinimumResourceCapability() { @Override public Resource getMaximumResourceCapability() { - return maximumAllocation; + Resource maxResource; + ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock(); + readLock.lock(); + try { + if (useConfiguredMaximumAllocationOnly) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + > configuredMaximumAllocationWaitTime) { + useConfiguredMaximumAllocationOnly = false; + } + maxResource = Resources.clone(configuredMaximumAllocation); + } else { + maxResource = Resources.clone(maximumAllocation); + } + } finally { + readLock.unlock(); + } + return maxResource; + } + + protected void initMaximumResourceCapability(Resource maximumAllocation) { + ReentrantReadWriteLock.WriteLock writeLock = + maximumAllocationLock.writeLock(); + writeLock.lock(); + try { + if (this.configuredMaximumAllocation == null) { + this.configuredMaximumAllocation = Resources.clone(maximumAllocation); + this.maximumAllocation = Resources.clone(maximumAllocation); + } + } finally { + writeLock.unlock(); + } } protected void containerLaunchedOnNode(ContainerId containerId, @@ -528,4 +569,63 @@ public Set getPlanQueues() throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support reservations"); } + + protected void updateMaximumAllocation(SchedulerNode node, boolean add) { + ReentrantReadWriteLock.WriteLock writeLock = + maximumAllocationLock.writeLock(); + writeLock.lock(); + try { + if (add) { // added node + int nodeMemory = node.getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + maximumAllocation.setMemory(Math.min( + configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + int nodeVCores = node.getAvailableResource().getVirtualCores(); + if (nodeVCores > maxNodeVCores) { + maxNodeVCores = nodeVCores; + maximumAllocation.setVirtualCores(Math.min( + configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); + } + } else { // removed node + if (maxNodeMemory == node.getAvailableResource().getMemory()) { + maxNodeMemory = -1; + } + if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) { + maxNodeVCores = -1; + } + // We only have to iterate through the nodes if the current max memory + // or vcores was equal to the removed node's + if (maxNodeMemory == -1 || maxNodeVCores == -1) { + for (Map.Entry nodeEntry : nodes.entrySet()) { + int nodeMemory = + nodeEntry.getValue().getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + int nodeVCores = + nodeEntry.getValue().getAvailableResource().getVirtualCores(); + if (nodeVCores > maxNodeVCores) { + maxNodeVCores = nodeVCores; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + if (maxNodeVCores == -1) { // no nodes + maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); + } else { + maximumAllocation.setVirtualCores( + Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); + } + } + } + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c383e43202..28158c136f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.HashSet; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -284,7 +283,7 @@ private synchronized void initScheduler(Configuration configuration) throws this.conf = loadCapacitySchedulerConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); + initMaximumResourceCapability(this.conf.getMaximumAllocation()); this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = @@ -321,8 +320,8 @@ private synchronized void startSchedulerThreads() { @Override public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); - initScheduler(configuration); super.serviceInit(conf); + initScheduler(configuration); } @Override @@ -849,7 +848,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), maximumAllocation); + getMinimumResourceCapability(), getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -1123,12 +1122,13 @@ private synchronized void addNode(RMNode nodeManager) { labelManager.activateNode(nodeManager.getNodeID(), nodeManager.getTotalCapability()); } - - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + updateMaximumAllocation(schedulerNode, true); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1177,6 +1177,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(node, false); LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 94fb849175..55e45497a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer, } private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); + nodes.put(node.getNodeID(), schedulerNode); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); + updateMaximumAllocation(schedulerNode, true); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); @@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); + updateMaximumAllocation(node, false); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterResource, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Set amResource for this app if (!application.getUnmanagedAM() && ask.size() == 1 @@ -1225,7 +1229,7 @@ private void initScheduler(Configuration conf) throws IOException { this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); + initMaximumResourceCapability(this.conf.getMaximumAllocation()); incrAllocation = this.conf.getIncrementAllocation(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 532edc79c5..3d4c9dd9b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -215,10 +215,13 @@ private synchronized void initScheduler(Configuration conf) { Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = + initMaximumResourceCapability( Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB), + conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES))); this.usePortForNodeName = conf.getBoolean( YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); @@ -303,7 +306,7 @@ public Allocation allocate( // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, maximumAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -899,6 +902,7 @@ private synchronized void removeNode(RMNode nodeInfo) { //Remove the node this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(node, false); // Update cluster metrics Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); @@ -916,9 +920,11 @@ public List getQueueUserAclInfo() { } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + updateMaximumAllocation(schedulerNode, true); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 12f7498b7b..225e225a8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java new file mode 100644 index 0000000000..67bdae2c6c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; + +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { + + public TestAbstractYarnScheduler(SchedulerType type) { + super(type); + } + + @Test + public void testMaximimumAllocationMemory() throws Exception { + final int node1MaxMemory = 15 * 1024; + final int node2MaxMemory = 5 * 1024; + final int node3MaxMemory = 6 * 1024; + final int configuredMaxMemory = 10 * 1024; + configureScheduler(); + YarnConfiguration conf = getConf(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationMemoryHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationMemoryHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + + private void testMaximumAllocationMemoryHelper( + AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocationVCores() throws Exception { + final int node1MaxVCores = 15; + final int node2MaxVCores = 5; + final int node3MaxVCores = 6; + final int configuredMaxVCores = 10; + configureScheduler(); + YarnConfiguration conf = getConf(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + configuredMaxVCores); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationVCoresHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxVCores, node2MaxVCores, node3MaxVCores, + configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, + configuredMaxVCores, configuredMaxVCores, configuredMaxVCores); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationVCoresHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxVCores, node2MaxVCores, node3MaxVCores, + configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, + node2MaxVCores, node3MaxVCores, node2MaxVCores); + } finally { + rm.stop(); + } + } + + private void testMaximumAllocationVCoresHelper( + AbstractYarnScheduler scheduler, + final int node1MaxVCores, final int node2MaxVCores, + final int node3MaxVCores, final int... expectedMaxVCores) + throws Exception { + Assert.assertEquals(6, expectedMaxVCores.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores(); + Assert.assertEquals(expectedMaxVCores[0], maxVCores); + + RMNode node1 = MockNodes.newNodeInfo( + 0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores(); + Assert.assertEquals(expectedMaxVCores[1], maxVCores); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores(); + Assert.assertEquals(expectedMaxVCores[2], maxVCores); + + RMNode node2 = MockNodes.newNodeInfo( + 0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores(); + Assert.assertEquals(expectedMaxVCores[3], maxVCores); + + RMNode node3 = MockNodes.newNodeInfo( + 0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores(); + Assert.assertEquals(expectedMaxVCores[4], maxVCores); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores(); + Assert.assertEquals(expectedMaxVCores[5], maxVCores); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index ad834ac1b3..9a29bff970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception { am1.registerAppAttempt(); LOG.info("sending container requests "); - am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); + am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1); AllocateResponse alloc1Response = am1.schedule(); // send the request // kick the scheduler @@ -291,7 +291,7 @@ public Token createContainerToken(ContainerId containerId, // This is to test fetching AM container will be retried, if AM container is // not fetchable since DNS is unavailable causing container token/NMtoken // creation failure. - @Test(timeout = 20000) + @Test(timeout = 30000) public void testAMContainerAllocationWhenDNSUnavailable() throws Exception { MockRM rm1 = new MockRM(conf) { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b5169bf45e..83440405a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -2590,37 +2591,7 @@ public void testNotAllowSubmitApplication() throws Exception { } assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); } - - @Test - public void testReservationThatDoesntFit() throws IOException { - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = - MockNodes - .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", - "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(updateEvent); - - FSAppAttempt app = scheduler.getSchedulerApp(attId); - assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - - createSchedulingRequestExistingApplication(1024, 2, attId); - scheduler.update(); - scheduler.handle(updateEvent); - - assertEquals(1, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - } - @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { scheduler.init(conf);