YARN-2604. Scheduler should consider max-allocation-* in conjunction with the largest node. (Robert Kanter via kasha)
This commit is contained in:
parent
c298a9a845
commit
3114d4731d
@ -84,6 +84,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-2375. Allow enabling/disabling timeline server per framework.
|
||||
(Mit Desai via jeagles)
|
||||
|
||||
YARN-2604. Scheduler should consider max-allocation-* in conjunction
|
||||
with the largest node. (Robert Kanter via kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -77,7 +78,14 @@ public abstract class AbstractYarnScheduler
|
||||
protected Resource clusterResource = Resource.newInstance(0, 0);
|
||||
|
||||
protected Resource minimumAllocation;
|
||||
protected Resource maximumAllocation;
|
||||
private Resource maximumAllocation;
|
||||
private Resource configuredMaximumAllocation;
|
||||
private int maxNodeMemory = -1;
|
||||
private int maxNodeVCores = -1;
|
||||
private ReentrantReadWriteLock maximumAllocationLock =
|
||||
new ReentrantReadWriteLock();
|
||||
private boolean useConfiguredMaximumAllocationOnly = true;
|
||||
private long configuredMaximumAllocationWaitTime;
|
||||
|
||||
protected RMContext rmContext;
|
||||
protected Map<ApplicationId, SchedulerApplication<T>> applications;
|
||||
@ -102,6 +110,9 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
nmExpireInterval =
|
||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||
configuredMaximumAllocationWaitTime =
|
||||
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
|
||||
createReleaseCache();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
@ -145,7 +156,37 @@ public Resource getMinimumResourceCapability() {
|
||||
|
||||
@Override
|
||||
public Resource getMaximumResourceCapability() {
|
||||
return maximumAllocation;
|
||||
Resource maxResource;
|
||||
ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
|
||||
readLock.lock();
|
||||
try {
|
||||
if (useConfiguredMaximumAllocationOnly) {
|
||||
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
|
||||
> configuredMaximumAllocationWaitTime) {
|
||||
useConfiguredMaximumAllocationOnly = false;
|
||||
}
|
||||
maxResource = Resources.clone(configuredMaximumAllocation);
|
||||
} else {
|
||||
maxResource = Resources.clone(maximumAllocation);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return maxResource;
|
||||
}
|
||||
|
||||
protected void initMaximumResourceCapability(Resource maximumAllocation) {
|
||||
ReentrantReadWriteLock.WriteLock writeLock =
|
||||
maximumAllocationLock.writeLock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (this.configuredMaximumAllocation == null) {
|
||||
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
|
||||
this.maximumAllocation = Resources.clone(maximumAllocation);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void containerLaunchedOnNode(ContainerId containerId,
|
||||
@ -528,4 +569,63 @@ public Set<String> getPlanQueues() throws YarnException {
|
||||
throw new YarnException(getClass().getSimpleName()
|
||||
+ " does not support reservations");
|
||||
}
|
||||
|
||||
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
|
||||
ReentrantReadWriteLock.WriteLock writeLock =
|
||||
maximumAllocationLock.writeLock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (add) { // added node
|
||||
int nodeMemory = node.getAvailableResource().getMemory();
|
||||
if (nodeMemory > maxNodeMemory) {
|
||||
maxNodeMemory = nodeMemory;
|
||||
maximumAllocation.setMemory(Math.min(
|
||||
configuredMaximumAllocation.getMemory(), maxNodeMemory));
|
||||
}
|
||||
int nodeVCores = node.getAvailableResource().getVirtualCores();
|
||||
if (nodeVCores > maxNodeVCores) {
|
||||
maxNodeVCores = nodeVCores;
|
||||
maximumAllocation.setVirtualCores(Math.min(
|
||||
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
|
||||
}
|
||||
} else { // removed node
|
||||
if (maxNodeMemory == node.getAvailableResource().getMemory()) {
|
||||
maxNodeMemory = -1;
|
||||
}
|
||||
if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) {
|
||||
maxNodeVCores = -1;
|
||||
}
|
||||
// We only have to iterate through the nodes if the current max memory
|
||||
// or vcores was equal to the removed node's
|
||||
if (maxNodeMemory == -1 || maxNodeVCores == -1) {
|
||||
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
|
||||
int nodeMemory =
|
||||
nodeEntry.getValue().getAvailableResource().getMemory();
|
||||
if (nodeMemory > maxNodeMemory) {
|
||||
maxNodeMemory = nodeMemory;
|
||||
}
|
||||
int nodeVCores =
|
||||
nodeEntry.getValue().getAvailableResource().getVirtualCores();
|
||||
if (nodeVCores > maxNodeVCores) {
|
||||
maxNodeVCores = nodeVCores;
|
||||
}
|
||||
}
|
||||
if (maxNodeMemory == -1) { // no nodes
|
||||
maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
|
||||
} else {
|
||||
maximumAllocation.setMemory(
|
||||
Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
|
||||
}
|
||||
if (maxNodeVCores == -1) { // no nodes
|
||||
maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
|
||||
} else {
|
||||
maximumAllocation.setVirtualCores(
|
||||
Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,6 @@
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -284,7 +283,7 @@ private synchronized void initScheduler(Configuration configuration) throws
|
||||
this.conf = loadCapacitySchedulerConfiguration(configuration);
|
||||
validateConf(this.conf);
|
||||
this.minimumAllocation = this.conf.getMinimumAllocation();
|
||||
this.maximumAllocation = this.conf.getMaximumAllocation();
|
||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
||||
this.calculator = this.conf.getResourceCalculator();
|
||||
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||
this.applications =
|
||||
@ -321,8 +320,8 @@ private synchronized void startSchedulerThreads() {
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
Configuration configuration = new Configuration(conf);
|
||||
initScheduler(configuration);
|
||||
super.serviceInit(conf);
|
||||
initScheduler(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -849,7 +848,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
||||
// Sanity check
|
||||
SchedulerUtils.normalizeRequests(
|
||||
ask, getResourceCalculator(), getClusterResource(),
|
||||
getMinimumResourceCapability(), maximumAllocation);
|
||||
getMinimumResourceCapability(), getMaximumResourceCapability());
|
||||
|
||||
// Release containers
|
||||
releaseContainers(release, application);
|
||||
@ -1123,12 +1122,13 @@ private synchronized void addNode(RMNode nodeManager) {
|
||||
labelManager.activateNode(nodeManager.getNodeID(),
|
||||
nodeManager.getTotalCapability());
|
||||
}
|
||||
|
||||
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
|
||||
usePortForNodeName));
|
||||
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
|
||||
usePortForNodeName);
|
||||
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
|
||||
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
|
||||
root.updateClusterResource(clusterResource);
|
||||
int numNodes = numNodeManagers.incrementAndGet();
|
||||
updateMaximumAllocation(schedulerNode, true);
|
||||
|
||||
LOG.info("Added node " + nodeManager.getNodeAddress() +
|
||||
" clusterResource: " + clusterResource);
|
||||
@ -1177,6 +1177,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
|
||||
}
|
||||
|
||||
this.nodes.remove(nodeInfo.getNodeID());
|
||||
updateMaximumAllocation(node, false);
|
||||
|
||||
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
|
||||
" clusterResource: " + clusterResource);
|
||||
|
@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer,
|
||||
}
|
||||
|
||||
private synchronized void addNode(RMNode node) {
|
||||
nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
|
||||
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
|
||||
nodes.put(node.getNodeID(), schedulerNode);
|
||||
Resources.addTo(clusterResource, node.getTotalCapability());
|
||||
updateRootQueueMetrics();
|
||||
updateMaximumAllocation(schedulerNode, true);
|
||||
|
||||
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
||||
queueMgr.getRootQueue().recomputeSteadyShares();
|
||||
@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) {
|
||||
nodes.remove(rmNode.getNodeID());
|
||||
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
||||
queueMgr.getRootQueue().recomputeSteadyShares();
|
||||
updateMaximumAllocation(node, false);
|
||||
LOG.info("Removed node " + rmNode.getNodeAddress() +
|
||||
" cluster capacity: " + clusterResource);
|
||||
}
|
||||
@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
|
||||
|
||||
// Sanity check
|
||||
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
|
||||
clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
|
||||
clusterResource, minimumAllocation, getMaximumResourceCapability(),
|
||||
incrAllocation);
|
||||
|
||||
// Set amResource for this app
|
||||
if (!application.getUnmanagedAM() && ask.size() == 1
|
||||
@ -1225,7 +1229,7 @@ private void initScheduler(Configuration conf) throws IOException {
|
||||
this.conf = new FairSchedulerConfiguration(conf);
|
||||
validateConf(this.conf);
|
||||
minimumAllocation = this.conf.getMinimumAllocation();
|
||||
maximumAllocation = this.conf.getMaximumAllocation();
|
||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
||||
incrAllocation = this.conf.getIncrementAllocation();
|
||||
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
||||
continuousSchedulingSleepMs =
|
||||
|
@ -215,10 +215,13 @@ private synchronized void initScheduler(Configuration conf) {
|
||||
Resources.createResource(conf.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
|
||||
this.maximumAllocation =
|
||||
initMaximumResourceCapability(
|
||||
Resources.createResource(conf.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
|
||||
conf.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
|
||||
this.usePortForNodeName = conf.getBoolean(
|
||||
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
|
||||
@ -303,7 +306,7 @@ public Allocation allocate(
|
||||
|
||||
// Sanity check
|
||||
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
|
||||
clusterResource, minimumAllocation, maximumAllocation);
|
||||
clusterResource, minimumAllocation, getMaximumResourceCapability());
|
||||
|
||||
// Release containers
|
||||
releaseContainers(release, application);
|
||||
@ -899,6 +902,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
|
||||
|
||||
//Remove the node
|
||||
this.nodes.remove(nodeInfo.getNodeID());
|
||||
updateMaximumAllocation(node, false);
|
||||
|
||||
// Update cluster metrics
|
||||
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
|
||||
@ -916,9 +920,11 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
|
||||
}
|
||||
|
||||
private synchronized void addNode(RMNode nodeManager) {
|
||||
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
|
||||
usePortForNodeName));
|
||||
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
|
||||
usePortForNodeName);
|
||||
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
|
||||
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
|
||||
updateMaximumAllocation(schedulerNode, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,6 +47,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
|
@ -0,0 +1,213 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||
|
||||
public TestAbstractYarnScheduler(SchedulerType type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaximimumAllocationMemory() throws Exception {
|
||||
final int node1MaxMemory = 15 * 1024;
|
||||
final int node2MaxMemory = 5 * 1024;
|
||||
final int node3MaxMemory = 6 * 1024;
|
||||
final int configuredMaxMemory = 10 * 1024;
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
configuredMaxMemory);
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
1000 * 1000);
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
testMaximumAllocationMemoryHelper(
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler(),
|
||||
node1MaxMemory, node2MaxMemory, node3MaxMemory,
|
||||
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
|
||||
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
0);
|
||||
rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
testMaximumAllocationMemoryHelper(
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler(),
|
||||
node1MaxMemory, node2MaxMemory, node3MaxMemory,
|
||||
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
|
||||
node2MaxMemory, node3MaxMemory, node2MaxMemory);
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void testMaximumAllocationMemoryHelper(
|
||||
AbstractYarnScheduler scheduler,
|
||||
final int node1MaxMemory, final int node2MaxMemory,
|
||||
final int node3MaxMemory, final int... expectedMaxMemory)
|
||||
throws Exception {
|
||||
Assert.assertEquals(6, expectedMaxMemory.length);
|
||||
|
||||
Assert.assertEquals(0, scheduler.getNumClusterNodes());
|
||||
int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
|
||||
Assert.assertEquals(expectedMaxMemory[0], maxMemory);
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(
|
||||
0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node1));
|
||||
Assert.assertEquals(1, scheduler.getNumClusterNodes());
|
||||
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
|
||||
Assert.assertEquals(expectedMaxMemory[1], maxMemory);
|
||||
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
|
||||
Assert.assertEquals(0, scheduler.getNumClusterNodes());
|
||||
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
|
||||
Assert.assertEquals(expectedMaxMemory[2], maxMemory);
|
||||
|
||||
RMNode node2 = MockNodes.newNodeInfo(
|
||||
0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node2));
|
||||
Assert.assertEquals(1, scheduler.getNumClusterNodes());
|
||||
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
|
||||
Assert.assertEquals(expectedMaxMemory[3], maxMemory);
|
||||
|
||||
RMNode node3 = MockNodes.newNodeInfo(
|
||||
0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node3));
|
||||
Assert.assertEquals(2, scheduler.getNumClusterNodes());
|
||||
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
|
||||
Assert.assertEquals(expectedMaxMemory[4], maxMemory);
|
||||
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
|
||||
Assert.assertEquals(1, scheduler.getNumClusterNodes());
|
||||
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
|
||||
Assert.assertEquals(expectedMaxMemory[5], maxMemory);
|
||||
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
|
||||
Assert.assertEquals(0, scheduler.getNumClusterNodes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaximimumAllocationVCores() throws Exception {
|
||||
final int node1MaxVCores = 15;
|
||||
final int node2MaxVCores = 5;
|
||||
final int node3MaxVCores = 6;
|
||||
final int configuredMaxVCores = 10;
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
configuredMaxVCores);
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
1000 * 1000);
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
testMaximumAllocationVCoresHelper(
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler(),
|
||||
node1MaxVCores, node2MaxVCores, node3MaxVCores,
|
||||
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
|
||||
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
0);
|
||||
rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
testMaximumAllocationVCoresHelper(
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler(),
|
||||
node1MaxVCores, node2MaxVCores, node3MaxVCores,
|
||||
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
|
||||
node2MaxVCores, node3MaxVCores, node2MaxVCores);
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void testMaximumAllocationVCoresHelper(
|
||||
AbstractYarnScheduler scheduler,
|
||||
final int node1MaxVCores, final int node2MaxVCores,
|
||||
final int node3MaxVCores, final int... expectedMaxVCores)
|
||||
throws Exception {
|
||||
Assert.assertEquals(6, expectedMaxVCores.length);
|
||||
|
||||
Assert.assertEquals(0, scheduler.getNumClusterNodes());
|
||||
int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
|
||||
Assert.assertEquals(expectedMaxVCores[0], maxVCores);
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(
|
||||
0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node1));
|
||||
Assert.assertEquals(1, scheduler.getNumClusterNodes());
|
||||
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
|
||||
Assert.assertEquals(expectedMaxVCores[1], maxVCores);
|
||||
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
|
||||
Assert.assertEquals(0, scheduler.getNumClusterNodes());
|
||||
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
|
||||
Assert.assertEquals(expectedMaxVCores[2], maxVCores);
|
||||
|
||||
RMNode node2 = MockNodes.newNodeInfo(
|
||||
0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node2));
|
||||
Assert.assertEquals(1, scheduler.getNumClusterNodes());
|
||||
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
|
||||
Assert.assertEquals(expectedMaxVCores[3], maxVCores);
|
||||
|
||||
RMNode node3 = MockNodes.newNodeInfo(
|
||||
0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node3));
|
||||
Assert.assertEquals(2, scheduler.getNumClusterNodes());
|
||||
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
|
||||
Assert.assertEquals(expectedMaxVCores[4], maxVCores);
|
||||
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
|
||||
Assert.assertEquals(1, scheduler.getNumClusterNodes());
|
||||
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
|
||||
Assert.assertEquals(expectedMaxVCores[5], maxVCores);
|
||||
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
|
||||
Assert.assertEquals(0, scheduler.getNumClusterNodes());
|
||||
}
|
||||
|
||||
}
|
@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception {
|
||||
am1.registerAppAttempt();
|
||||
|
||||
LOG.info("sending container requests ");
|
||||
am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
|
||||
am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
|
||||
AllocateResponse alloc1Response = am1.schedule(); // send the request
|
||||
|
||||
// kick the scheduler
|
||||
@ -291,7 +291,7 @@ public Token createContainerToken(ContainerId containerId,
|
||||
// This is to test fetching AM container will be retried, if AM container is
|
||||
// not fetchable since DNS is unavailable causing container token/NMtoken
|
||||
// creation failure.
|
||||
@Test(timeout = 20000)
|
||||
@Test(timeout = 30000)
|
||||
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
|
||||
MockRM rm1 = new MockRM(conf) {
|
||||
@Override
|
||||
|
@ -71,6 +71,7 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
@ -2590,37 +2591,7 @@ public void testNotAllowSubmitApplication() throws Exception {
|
||||
}
|
||||
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReservationThatDoesntFit() throws IOException {
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
|
||||
"user1", 1);
|
||||
scheduler.update();
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(attId);
|
||||
assertEquals(0, app.getLiveContainers().size());
|
||||
assertEquals(0, app.getReservedContainers().size());
|
||||
|
||||
createSchedulingRequestExistingApplication(1024, 2, attId);
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
assertEquals(1, app.getLiveContainers().size());
|
||||
assertEquals(0, app.getReservedContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
|
||||
scheduler.init(conf);
|
||||
|
Loading…
Reference in New Issue
Block a user