YARN-2913. Fair scheduler should have ability to set MaxResourceDefault for each queue. (Siqi Li via mingma)
This commit is contained in:
parent
f8adeb712d
commit
934d96a334
@ -531,6 +531,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-4243. Add retry on establishing Zookeeper conenction in
|
YARN-4243. Add retry on establishing Zookeeper conenction in
|
||||||
EmbeddedElectorService#serviceInit. (Xuan Gong via junping_du)
|
EmbeddedElectorService#serviceInit. (Xuan Gong via junping_du)
|
||||||
|
|
||||||
|
YARN-2913. Fair scheduler should have ability to set MaxResourceDefault for
|
||||||
|
each queue. (Siqi Li via mingma)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
@ -29,6 +29,8 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -36,7 +38,8 @@
|
|||||||
public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
|
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
|
||||||
private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
|
private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
|
||||||
|
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
||||||
|
new DefaultResourceCalculator();
|
||||||
// Minimum resource allocation for each queue
|
// Minimum resource allocation for each queue
|
||||||
private final Map<String, Resource> minQueueResources;
|
private final Map<String, Resource> minQueueResources;
|
||||||
// Maximum amount of resources per queue
|
// Maximum amount of resources per queue
|
||||||
@ -53,6 +56,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||||||
final Map<String, Integer> userMaxApps;
|
final Map<String, Integer> userMaxApps;
|
||||||
private final int userMaxAppsDefault;
|
private final int userMaxAppsDefault;
|
||||||
private final int queueMaxAppsDefault;
|
private final int queueMaxAppsDefault;
|
||||||
|
private final Resource queueMaxResourcesDefault;
|
||||||
|
|
||||||
// Maximum resource share for each leaf queue that can be used to run AMs
|
// Maximum resource share for each leaf queue that can be used to run AMs
|
||||||
final Map<String, Float> queueMaxAMShares;
|
final Map<String, Float> queueMaxAMShares;
|
||||||
@ -99,7 +103,8 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
|||||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||||
Map<String, ResourceWeights> queueWeights,
|
Map<String, ResourceWeights> queueWeights,
|
||||||
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
|
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
|
||||||
int queueMaxAppsDefault, float queueMaxAMShareDefault,
|
int queueMaxAppsDefault, Resource queueMaxResourcesDefault,
|
||||||
|
float queueMaxAMShareDefault,
|
||||||
Map<String, SchedulingPolicy> schedulingPolicies,
|
Map<String, SchedulingPolicy> schedulingPolicies,
|
||||||
SchedulingPolicy defaultSchedulingPolicy,
|
SchedulingPolicy defaultSchedulingPolicy,
|
||||||
Map<String, Long> minSharePreemptionTimeouts,
|
Map<String, Long> minSharePreemptionTimeouts,
|
||||||
@ -117,6 +122,7 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
|||||||
this.queueMaxAMShares = queueMaxAMShares;
|
this.queueMaxAMShares = queueMaxAMShares;
|
||||||
this.queueWeights = queueWeights;
|
this.queueWeights = queueWeights;
|
||||||
this.userMaxAppsDefault = userMaxAppsDefault;
|
this.userMaxAppsDefault = userMaxAppsDefault;
|
||||||
|
this.queueMaxResourcesDefault = queueMaxResourcesDefault;
|
||||||
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
||||||
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
|
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
|
||||||
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
|
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
|
||||||
@ -140,6 +146,7 @@ public AllocationConfiguration(Configuration conf) {
|
|||||||
queueMaxAMShares = new HashMap<String, Float>();
|
queueMaxAMShares = new HashMap<String, Float>();
|
||||||
userMaxAppsDefault = Integer.MAX_VALUE;
|
userMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
queueMaxAppsDefault = Integer.MAX_VALUE;
|
queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
|
queueMaxResourcesDefault = Resources.unbounded();
|
||||||
queueMaxAMShareDefault = 0.5f;
|
queueMaxAMShareDefault = 0.5f;
|
||||||
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
|
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||||
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||||
@ -243,7 +250,18 @@ public Resource getMinResources(String queue) {
|
|||||||
|
|
||||||
public Resource getMaxResources(String queueName) {
|
public Resource getMaxResources(String queueName) {
|
||||||
Resource maxQueueResource = maxQueueResources.get(queueName);
|
Resource maxQueueResource = maxQueueResources.get(queueName);
|
||||||
return (maxQueueResource == null) ? Resources.unbounded() : maxQueueResource;
|
if (maxQueueResource == null) {
|
||||||
|
Resource minQueueResource = minQueueResources.get(queueName);
|
||||||
|
if (minQueueResource != null &&
|
||||||
|
Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(),
|
||||||
|
minQueueResource, queueMaxResourcesDefault)) {
|
||||||
|
return minQueueResource;
|
||||||
|
} else {
|
||||||
|
return queueMaxResourcesDefault;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return maxQueueResource;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasAccess(String queueName, QueueACL acl,
|
public boolean hasAccess(String queueName, QueueACL acl,
|
||||||
|
@ -202,7 +202,8 @@ public synchronized void setReloadListener(Listener reloadListener) {
|
|||||||
* @throws SAXException if config file is malformed.
|
* @throws SAXException if config file is malformed.
|
||||||
*/
|
*/
|
||||||
public synchronized void reloadAllocations() throws IOException,
|
public synchronized void reloadAllocations() throws IOException,
|
||||||
ParserConfigurationException, SAXException, AllocationConfigurationException {
|
ParserConfigurationException, SAXException,
|
||||||
|
AllocationConfigurationException {
|
||||||
if (allocFile == null) {
|
if (allocFile == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -225,6 +226,7 @@ public synchronized void reloadAllocations() throws IOException,
|
|||||||
Set<String> reservableQueues = new HashSet<String>();
|
Set<String> reservableQueues = new HashSet<String>();
|
||||||
int userMaxAppsDefault = Integer.MAX_VALUE;
|
int userMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
|
Resource queueMaxResourcesDefault = Resources.unbounded();
|
||||||
float queueMaxAMShareDefault = 0.5f;
|
float queueMaxAMShareDefault = 0.5f;
|
||||||
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
|
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
@ -282,6 +284,11 @@ public synchronized void reloadAllocations() throws IOException,
|
|||||||
userMaxApps.put(userName, val);
|
userMaxApps.put(userName, val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if ("queueMaxResourcesDefault".equals(element.getTagName())) {
|
||||||
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
|
Resource val =
|
||||||
|
FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||||
|
queueMaxResourcesDefault = val;
|
||||||
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
|
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
int val = Integer.parseInt(text);
|
int val = Integer.parseInt(text);
|
||||||
@ -398,9 +405,9 @@ public synchronized void reloadAllocations() throws IOException,
|
|||||||
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
|
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
|
||||||
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
|
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
|
||||||
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
|
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
|
||||||
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
|
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
|
||||||
minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||||
fairSharePreemptionThresholds, queueAcls,
|
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||||
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
|
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
|
||||||
reservableQueues);
|
reservableQueues);
|
||||||
|
|
||||||
|
@ -153,15 +153,18 @@ public void testAllocationFileParsing() throws Exception {
|
|||||||
// Give queue A a minimum of 1024 M
|
// Give queue A a minimum of 1024 M
|
||||||
out.println("<queue name=\"queueA\">");
|
out.println("<queue name=\"queueA\">");
|
||||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
// Give queue B a minimum of 2048 M
|
// Give queue B a minimum of 2048 M
|
||||||
out.println("<queue name=\"queueB\">");
|
out.println("<queue name=\"queueB\">");
|
||||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||||
|
out.println("<maxResources>5120mb,110vcores</maxResources>");
|
||||||
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
||||||
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
// Give queue C no minimum
|
// Give queue C no minimum
|
||||||
out.println("<queue name=\"queueC\">");
|
out.println("<queue name=\"queueC\">");
|
||||||
|
out.println("<minResources>5120mb,0vcores</minResources>");
|
||||||
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
|
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
// Give queue D a limit of 3 running apps and 0.4f maxAMShare
|
// Give queue D a limit of 3 running apps and 0.4f maxAMShare
|
||||||
@ -190,6 +193,8 @@ public void testAllocationFileParsing() throws Exception {
|
|||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
// Set default limit of apps per queue to 15
|
// Set default limit of apps per queue to 15
|
||||||
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
|
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
|
||||||
|
// Set default limit of max resource per queue to 4G and 100 cores
|
||||||
|
out.println("<queueMaxResourcesDefault>4096mb,100vcores</queueMaxResourcesDefault>");
|
||||||
// Set default limit of apps per user to 5
|
// Set default limit of apps per user to 5
|
||||||
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
|
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
|
||||||
// Set default limit of AMResourceShare to 0.5f
|
// Set default limit of AMResourceShare to 0.5f
|
||||||
@ -222,11 +227,22 @@ public void testAllocationFileParsing() throws Exception {
|
|||||||
assertEquals(Resources.createResource(0),
|
assertEquals(Resources.createResource(0),
|
||||||
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||||
|
|
||||||
|
assertEquals(Resources.createResource(2048, 10),
|
||||||
|
queueConf.getMaxResources("root.queueA"));
|
||||||
|
assertEquals(Resources.createResource(5120, 110),
|
||||||
|
queueConf.getMaxResources("root.queueB"));
|
||||||
|
assertEquals(Resources.createResource(5120, 0),
|
||||||
|
queueConf.getMaxResources("root.queueC"));
|
||||||
|
assertEquals(Resources.createResource(4096, 100),
|
||||||
|
queueConf.getMaxResources("root.queueD"));
|
||||||
|
assertEquals(Resources.createResource(4096, 100),
|
||||||
|
queueConf.getMaxResources("root.queueE"));
|
||||||
|
|
||||||
assertEquals(Resources.createResource(1024, 0),
|
assertEquals(Resources.createResource(1024, 0),
|
||||||
queueConf.getMinResources("root.queueA"));
|
queueConf.getMinResources("root.queueA"));
|
||||||
assertEquals(Resources.createResource(2048, 0),
|
assertEquals(Resources.createResource(2048, 0),
|
||||||
queueConf.getMinResources("root.queueB"));
|
queueConf.getMinResources("root.queueB"));
|
||||||
assertEquals(Resources.createResource(0),
|
assertEquals(Resources.createResource(5120, 0),
|
||||||
queueConf.getMinResources("root.queueC"));
|
queueConf.getMinResources("root.queueC"));
|
||||||
assertEquals(Resources.createResource(0),
|
assertEquals(Resources.createResource(0),
|
||||||
queueConf.getMinResources("root.queueD"));
|
queueConf.getMinResources("root.queueD"));
|
||||||
|
@ -128,6 +128,8 @@ The allocation file must be in XML format. The format contains five types of ele
|
|||||||
|
|
||||||
* **A queueMaxAppsDefault element**: which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.
|
* **A queueMaxAppsDefault element**: which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.
|
||||||
|
|
||||||
|
* **A queueMaxResourcesDefault element**: which sets the default max resource limit for queue; overriden by maxResources element in each queue.
|
||||||
|
|
||||||
* **A queueMaxAMShareDefault element**: which sets the default AM resource limit for queue; overriden by maxAMShare element in each queue.
|
* **A queueMaxAMShareDefault element**: which sets the default AM resource limit for queue; overriden by maxAMShare element in each queue.
|
||||||
|
|
||||||
* **A defaultQueueSchedulingPolicy element**: which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair".
|
* **A defaultQueueSchedulingPolicy element**: which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair".
|
||||||
@ -167,6 +169,7 @@ The allocation file must be in XML format. The format contains five types of ele
|
|||||||
</queue>
|
</queue>
|
||||||
|
|
||||||
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
|
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
|
||||||
|
<queueMaxResourcesDefault>40000 mb,0vcores</queueMaxResourcesDefault>
|
||||||
|
|
||||||
<!-- Queue 'secondary_group_queue' is a parent queue and may have
|
<!-- Queue 'secondary_group_queue' is a parent queue and may have
|
||||||
user queues under it -->
|
user queues under it -->
|
||||||
|
Loading…
Reference in New Issue
Block a user