YARN-2393. FairScheduler: Add the notion of steady fair share. (Wei Yan via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1619845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3aa3b0abc2
commit
0097b15e21
@ -55,6 +55,9 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2174. Enable HTTPs for the writer REST API of TimelineServer.
|
||||
(Zhijie Shen via jianhe)
|
||||
|
||||
YARN-2393. FairScheduler: Add the notion of steady fair share.
|
||||
(Wei Yan via kasha)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
|
||||
|
@ -717,12 +717,6 @@ public void setFairShare(Resource fairShare) {
|
||||
this.fairShare = fairShare;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void updateDemand() {
|
||||
demand = Resources.createResource(0);
|
||||
|
@ -35,7 +35,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
|
||||
@ -68,6 +67,16 @@ public void recomputeShares() {
|
||||
}
|
||||
}
|
||||
|
||||
public void recomputeSteadyShares() {
|
||||
policy.computeSteadyShares(childQueues, getSteadyFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
|
||||
if (childQueue instanceof FSParentQueue) {
|
||||
((FSParentQueue) childQueue).recomputeSteadyShares();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getDemand() {
|
||||
return demand;
|
||||
|
@ -41,6 +41,7 @@
|
||||
@Unstable
|
||||
public abstract class FSQueue implements Queue, Schedulable {
|
||||
private Resource fairShare = Resources.createResource(0, 0);
|
||||
private Resource steadyFairShare = Resources.createResource(0, 0);
|
||||
private final String name;
|
||||
protected final FairScheduler scheduler;
|
||||
private final FSQueueMetrics metrics;
|
||||
@ -151,7 +152,17 @@ public void setFairShare(Resource fairShare) {
|
||||
this.fairShare = fairShare;
|
||||
metrics.setFairShare(fairShare);
|
||||
}
|
||||
|
||||
|
||||
/** Get the steady fair share assigned to this Schedulable. */
|
||||
public Resource getSteadyFairShare() {
|
||||
return steadyFairShare;
|
||||
}
|
||||
|
||||
public void setSteadyFairShare(Resource steadyFairShare) {
|
||||
this.steadyFairShare = steadyFairShare;
|
||||
metrics.setSteadyFairShare(steadyFairShare);
|
||||
}
|
||||
|
||||
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
|
||||
}
|
||||
@ -161,7 +172,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||
* queue's current share
|
||||
*/
|
||||
public abstract void recomputeShares();
|
||||
|
||||
|
||||
/**
|
||||
* Gets the children of this queue, if any.
|
||||
*/
|
||||
@ -194,7 +205,9 @@ protected boolean assignContainerPreCheck(FSSchedulerNode node) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Returns true if queue has at least one app running.
|
||||
*/
|
||||
public boolean isActive() {
|
||||
return getNumRunnableApps() > 0;
|
||||
}
|
||||
|
@ -33,6 +33,8 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||
|
||||
@Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
|
||||
@Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
|
||||
@Metric("Steady fair share of memory in MB") MutableGaugeInt steadyFairShareMB;
|
||||
@Metric("Steady fair share of CPU in vcores") MutableGaugeInt steadyFairShareVCores;
|
||||
@Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
|
||||
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
|
||||
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
|
||||
@ -55,7 +57,20 @@ public int getFairShareMB() {
|
||||
public int getFairShareVirtualCores() {
|
||||
return fairShareVCores.value();
|
||||
}
|
||||
|
||||
|
||||
public void setSteadyFairShare(Resource resource) {
|
||||
steadyFairShareMB.set(resource.getMemory());
|
||||
steadyFairShareVCores.set(resource.getVirtualCores());
|
||||
}
|
||||
|
||||
public int getSteadyFairShareMB() {
|
||||
return steadyFairShareMB.value();
|
||||
}
|
||||
|
||||
public int getSteadyFairShareVCores() {
|
||||
return steadyFairShareVCores.value();
|
||||
}
|
||||
|
||||
public void setMinShare(Resource resource) {
|
||||
minShareMB.set(resource.getMemory());
|
||||
minShareVCores.set(resource.getVirtualCores());
|
||||
|
@ -851,6 +851,8 @@ private synchronized void addNode(RMNode node) {
|
||||
Resources.addTo(clusterResource, node.getTotalCapability());
|
||||
updateRootQueueMetrics();
|
||||
|
||||
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
||||
queueMgr.getRootQueue().recomputeSteadyShares();
|
||||
LOG.info("Added node " + node.getNodeAddress() +
|
||||
" cluster capacity: " + clusterResource);
|
||||
}
|
||||
@ -885,6 +887,8 @@ private synchronized void removeNode(RMNode rmNode) {
|
||||
}
|
||||
|
||||
nodes.remove(rmNode.getNodeID());
|
||||
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
||||
queueMgr.getRootQueue().recomputeSteadyShares();
|
||||
LOG.info("Removed node " + rmNode.getNodeAddress() +
|
||||
" cluster capacity: " + clusterResource);
|
||||
}
|
||||
|
@ -118,6 +118,11 @@ private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
|
||||
if (queue == null && create) {
|
||||
// if the queue doesn't exist,create it and return
|
||||
queue = createQueue(name, queueType);
|
||||
|
||||
// Update steady fair share for all queues
|
||||
if (queue != null) {
|
||||
rootQueue.recomputeSteadyShares();
|
||||
}
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
@ -190,7 +195,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
|
||||
parent = newParent;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return parent;
|
||||
}
|
||||
|
||||
@ -376,5 +381,8 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
||||
+ queue.getName(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
// Update steady fair shares for all queues
|
||||
rootQueue.recomputeSteadyShares();
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
* A Schedulable represents an entity that can be scheduled such as an
|
||||
@ -102,10 +101,4 @@ public interface Schedulable {
|
||||
|
||||
/** Assign a fair share to this Schedulable. */
|
||||
public void setFairShare(Resource fairShare);
|
||||
|
||||
/**
|
||||
* Returns true if queue has atleast one app running. Always returns true for
|
||||
* AppSchedulables.
|
||||
*/
|
||||
public boolean isActive();
|
||||
}
|
||||
|
@ -17,10 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
@ -29,6 +25,10 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract class SchedulingPolicy {
|
||||
@ -131,8 +131,10 @@ public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
|
||||
public abstract Comparator<Schedulable> getComparator();
|
||||
|
||||
/**
|
||||
* Computes and updates the shares of {@link Schedulable}s as per the
|
||||
* {@link SchedulingPolicy}, to be used later at schedule time.
|
||||
* Computes and updates the shares of {@link Schedulable}s as per
|
||||
* the {@link SchedulingPolicy}, to be used later for scheduling decisions.
|
||||
* The shares computed are instantaneous and only consider queues with
|
||||
* running applications.
|
||||
*
|
||||
* @param schedulables {@link Schedulable}s whose shares are to be updated
|
||||
* @param totalResources Total {@link Resource}s in the cluster
|
||||
@ -140,6 +142,19 @@ public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
|
||||
public abstract void computeShares(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources);
|
||||
|
||||
/**
|
||||
* Computes and updates the steady shares of {@link FSQueue}s as per the
|
||||
* {@link SchedulingPolicy}. The steady share does not differentiate
|
||||
* between queues with and without running applications under them. The
|
||||
* steady share is not used for scheduling, it is displayed on the Web UI
|
||||
* for better visibility.
|
||||
*
|
||||
* @param queues {@link FSQueue}s whose shares are to be updated
|
||||
* @param totalResources Total {@link Resource}s in the cluster
|
||||
*/
|
||||
public abstract void computeSteadyShares(
|
||||
Collection<? extends FSQueue> queues, Resource totalResources);
|
||||
|
||||
/**
|
||||
* Check if the resource usage is over the fair share under this policy
|
||||
*
|
||||
|
@ -22,6 +22,7 @@
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
|
||||
/**
|
||||
@ -49,14 +50,29 @@ public static void computeShares(
|
||||
ResourceType type) {
|
||||
Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
|
||||
for (Schedulable sched : schedulables) {
|
||||
if (sched.isActive()) {
|
||||
activeSchedulables.add(sched);
|
||||
} else {
|
||||
if ((sched instanceof FSQueue) && !((FSQueue) sched).isActive()) {
|
||||
setResourceValue(0, sched.getFairShare(), type);
|
||||
} else {
|
||||
activeSchedulables.add(sched);
|
||||
}
|
||||
}
|
||||
|
||||
computeSharesInternal(activeSchedulables, totalResources, type);
|
||||
computeSharesInternal(activeSchedulables, totalResources, type, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the steady fair share of the given queues. The steady fair
|
||||
* share is an allocation of shares considering all queues, i.e.,
|
||||
* active and inactive.
|
||||
*
|
||||
* @param queues
|
||||
* @param totalResources
|
||||
* @param type
|
||||
*/
|
||||
public static void computeSteadyShares(
|
||||
Collection<? extends FSQueue> queues, Resource totalResources,
|
||||
ResourceType type) {
|
||||
computeSharesInternal(queues, totalResources, type, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -102,7 +118,7 @@ public static void computeShares(
|
||||
*/
|
||||
private static void computeSharesInternal(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources,
|
||||
ResourceType type) {
|
||||
ResourceType type, boolean isSteadyShare) {
|
||||
if (schedulables.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
@ -145,7 +161,13 @@ private static void computeSharesInternal(
|
||||
}
|
||||
// Set the fair shares based on the value of R we've converged to
|
||||
for (Schedulable sched : schedulables) {
|
||||
setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type);
|
||||
if (isSteadyShare) {
|
||||
setResourceValue(computeShare(sched, right, type),
|
||||
((FSQueue) sched).getSteadyFairShare(), type);
|
||||
} else {
|
||||
setResourceValue(
|
||||
computeShare(sched, right, type), sched.getFairShare(), type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
@ -68,6 +69,14 @@ public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
ComputeFairShares.computeShares(schedulables, totalResources, type);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeSteadyShares(Collection<? extends FSQueue> queues,
|
||||
Resource totalResources) {
|
||||
for (ResourceType type : ResourceType.values()) {
|
||||
ComputeFairShares.computeSteadyShares(queues, totalResources, type);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
@ -119,6 +120,13 @@ public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeSteadyShares(Collection<? extends FSQueue> queues,
|
||||
Resource totalResources) {
|
||||
ComputeFairShares.computeSteadyShares(queues, totalResources,
|
||||
ResourceType.MEMORY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
|
||||
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
@ -87,6 +88,13 @@ public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
earliest.setFairShare(Resources.clone(totalResources));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeSteadyShares(Collection<? extends FSQueue> queues,
|
||||
Resource totalResources) {
|
||||
// Nothing needs to do, as leaf queue doesn't have to calculate steady
|
||||
// fair shares for applications.
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
|
||||
throw new UnsupportedOperationException(
|
||||
|
@ -100,11 +100,6 @@ public void setFairShare(Resource fairShare) {
|
||||
this.fairShare = fairShare;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getDemand() {
|
||||
return null;
|
||||
|
@ -292,14 +292,19 @@ public void testSimpleFairShareCalculation() throws IOException {
|
||||
createSchedulingRequest(10 * 1024, "root.default", "user1");
|
||||
|
||||
scheduler.update();
|
||||
scheduler.getQueueManager().getRootQueue()
|
||||
.setSteadyFairShare(scheduler.getClusterResource());
|
||||
scheduler.getQueueManager().getRootQueue().recomputeSteadyShares();
|
||||
|
||||
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
|
||||
assertEquals(3, queues.size());
|
||||
|
||||
// Divided three ways - betwen the two queues and the default queue
|
||||
// Divided three ways - between the two queues and the default queue
|
||||
for (FSLeafQueue p : queues) {
|
||||
assertEquals(3414, p.getFairShare().getMemory());
|
||||
assertEquals(3414, p.getMetrics().getFairShareMB());
|
||||
assertEquals(3414, p.getSteadyFairShare().getMemory());
|
||||
assertEquals(3414, p.getMetrics().getSteadyFairShareMB());
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,6 +328,9 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException {
|
||||
createSchedulingRequest(10 * 1024, "root.default", "user1");
|
||||
|
||||
scheduler.update();
|
||||
scheduler.getQueueManager().getRootQueue()
|
||||
.setSteadyFairShare(scheduler.getClusterResource());
|
||||
scheduler.getQueueManager().getRootQueue().recomputeSteadyShares();
|
||||
|
||||
QueueManager queueManager = scheduler.getQueueManager();
|
||||
Collection<FSLeafQueue> queues = queueManager.getLeafQueues();
|
||||
@ -333,10 +341,16 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException {
|
||||
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3", true);
|
||||
assertEquals(capacity / 2, queue1.getFairShare().getMemory());
|
||||
assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB());
|
||||
assertEquals(capacity / 2, queue1.getSteadyFairShare().getMemory());
|
||||
assertEquals(capacity / 2, queue1.getMetrics().getSteadyFairShareMB());
|
||||
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
|
||||
assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB());
|
||||
assertEquals(capacity / 4, queue2.getSteadyFairShare().getMemory());
|
||||
assertEquals(capacity / 4, queue2.getMetrics().getSteadyFairShareMB());
|
||||
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
|
||||
assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB());
|
||||
assertEquals(capacity / 4, queue3.getSteadyFairShare().getMemory());
|
||||
assertEquals(capacity / 4, queue3.getMetrics().getSteadyFairShareMB());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -771,6 +785,9 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
|
||||
createSchedulingRequest(10 * 1024, "root.default", "user3");
|
||||
|
||||
scheduler.update();
|
||||
scheduler.getQueueManager().getRootQueue()
|
||||
.setSteadyFairShare(scheduler.getClusterResource());
|
||||
scheduler.getQueueManager().getRootQueue().recomputeSteadyShares();
|
||||
|
||||
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
|
||||
.getLeafQueues();
|
||||
@ -780,12 +797,128 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
|
||||
|| leaf.getName().equals("root.parentq.user2")) {
|
||||
// assert that the fair share is 1/4th node1's capacity
|
||||
assertEquals(capacity / 4, leaf.getFairShare().getMemory());
|
||||
// assert that the steady fair share is 1/4th node1's capacity
|
||||
assertEquals(capacity / 4, leaf.getSteadyFairShare().getMemory());
|
||||
// assert weights are equal for both the user queues
|
||||
assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSteadyFairShareWithReloadAndNodeAddRemove() throws Exception {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
||||
out.println("<queue name=\"root\">");
|
||||
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||
out.println(" <queue name=\"child1\">");
|
||||
out.println(" <weight>1</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"child2\">");
|
||||
out.println(" <weight>1</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// The steady fair share for all queues should be 0
|
||||
QueueManager queueManager = scheduler.getQueueManager();
|
||||
assertEquals(0, queueManager.getLeafQueue("child1", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
assertEquals(0, queueManager.getLeafQueue("child2", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
|
||||
// Add one node
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
assertEquals(6144, scheduler.getClusterResource().getMemory());
|
||||
|
||||
// The steady fair shares for all queues should be updated
|
||||
assertEquals(2048, queueManager.getLeafQueue("child1", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
assertEquals(2048, queueManager.getLeafQueue("child2", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
|
||||
// Reload the allocation configuration file
|
||||
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
||||
out.println("<queue name=\"root\">");
|
||||
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||
out.println(" <queue name=\"child1\">");
|
||||
out.println(" <weight>1</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"child2\">");
|
||||
out.println(" <weight>2</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"child3\">");
|
||||
out.println(" <weight>2</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// The steady fair shares for all queues should be updated
|
||||
assertEquals(1024, queueManager.getLeafQueue("child1", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
assertEquals(2048, queueManager.getLeafQueue("child2", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
assertEquals(2048, queueManager.getLeafQueue("child3", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
|
||||
// Remove the node, steady fair shares should back to 0
|
||||
NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent2);
|
||||
assertEquals(0, scheduler.getClusterResource().getMemory());
|
||||
assertEquals(0, queueManager.getLeafQueue("child1", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
assertEquals(0, queueManager.getLeafQueue("child2", false)
|
||||
.getSteadyFairShare().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSteadyFairShareWithQueueCreatedRuntime() throws Exception {
|
||||
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one node
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
assertEquals(6144, scheduler.getClusterResource().getMemory());
|
||||
assertEquals(6144, scheduler.getQueueManager().getRootQueue()
|
||||
.getSteadyFairShare().getMemory());
|
||||
assertEquals(6144, scheduler.getQueueManager()
|
||||
.getLeafQueue("default", false).getSteadyFairShare().getMemory());
|
||||
|
||||
// Submit one application
|
||||
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
|
||||
createApplicationWithAMResource(appAttemptId1, "default", "user1", null);
|
||||
assertEquals(3072, scheduler.getQueueManager()
|
||||
.getLeafQueue("default", false).getSteadyFairShare().getMemory());
|
||||
assertEquals(3072, scheduler.getQueueManager()
|
||||
.getLeafQueue("user1", false).getSteadyFairShare().getMemory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Make allocation requests and ensure they are reflected in queue demand.
|
||||
*/
|
||||
@ -873,7 +1006,7 @@ public void testAppAdditionAndRemoval() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException,
|
||||
public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException,
|
||||
AllocationConfigurationException, ParserConfigurationException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
|
@ -109,13 +109,15 @@ public void testFairShareNoAppsRunning() throws IOException {
|
||||
|
||||
for (FSLeafQueue leaf : leafQueues) {
|
||||
if (leaf.getName().startsWith("root.parentA")) {
|
||||
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity
|
||||
* 100, 0);
|
||||
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity,
|
||||
0);
|
||||
} else if (leaf.getName().startsWith("root.parentB")) {
|
||||
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity
|
||||
* 100, 0.1);
|
||||
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity,
|
||||
0);
|
||||
}
|
||||
}
|
||||
|
||||
verifySteadyFairShareMemory(leafQueues, nodeCapacity);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -135,14 +137,15 @@ public void testFairShareOneAppRunning() throws IOException {
|
||||
100,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA1", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0.1);
|
||||
.getMemory() / nodeCapacity * 100, 0.1);
|
||||
assertEquals(
|
||||
0,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA2", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0.1);
|
||||
.getMemory() / nodeCapacity, 0.1);
|
||||
|
||||
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
|
||||
nodeCapacity);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -167,6 +170,9 @@ public void testFairShareMultipleActiveQueuesUnderSameParent()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, .9);
|
||||
}
|
||||
|
||||
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
|
||||
nodeCapacity);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -206,6 +212,9 @@ public void testFairShareMultipleActiveQueuesUnderDifferentParent()
|
||||
.getLeafQueue("root.parentB.childB1", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, .9);
|
||||
|
||||
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
|
||||
nodeCapacity);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -253,6 +262,9 @@ public void testFairShareResetsToZeroWhenAppsComplete() throws IOException {
|
||||
.getLeafQueue("root.parentA.childA2", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0.1);
|
||||
|
||||
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
|
||||
nodeCapacity);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -304,5 +316,45 @@ public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent()
|
||||
.getLeafQueue("root.parentB.childB1", false).getFairShare()
|
||||
.getVirtualCores()
|
||||
/ nodeVCores * 100, .9);
|
||||
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
|
||||
.getLeafQueues();
|
||||
|
||||
for (FSLeafQueue leaf : leafQueues) {
|
||||
if (leaf.getName().startsWith("root.parentA")) {
|
||||
assertEquals(0.2,
|
||||
(double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001);
|
||||
assertEquals(0.2,
|
||||
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
|
||||
0.001);
|
||||
} else if (leaf.getName().startsWith("root.parentB")) {
|
||||
assertEquals(0.05,
|
||||
(double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001);
|
||||
assertEquals(0.1,
|
||||
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
|
||||
0.001);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify whether steady fair shares for all leaf queues still follow
|
||||
* their weight, not related to active/inactive status.
|
||||
*
|
||||
* @param leafQueues
|
||||
* @param nodeCapacity
|
||||
*/
|
||||
private void verifySteadyFairShareMemory(Collection<FSLeafQueue> leafQueues,
|
||||
int nodeCapacity) {
|
||||
for (FSLeafQueue leaf : leafQueues) {
|
||||
if (leaf.getName().startsWith("root.parentA")) {
|
||||
assertEquals(0.2,
|
||||
(double) leaf.getSteadyFairShare().getMemory() / nodeCapacity,
|
||||
0.001);
|
||||
} else if (leaf.getName().startsWith("root.parentB")) {
|
||||
assertEquals(0.05,
|
||||
(double) leaf.getSteadyFairShare().getMemory() / nodeCapacity,
|
||||
0.001);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user