diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index d0425907f6..5e82f401b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -360,13 +360,8 @@ protected void recoverApplication(ApplicationStateData appState,
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime) throws YarnException {
+
if (!isRecovery) {
- // Do queue mapping
- if (rmContext.getQueuePlacementManager() != null) {
- // We only do queue mapping when it's a new application
- rmContext.getQueuePlacementManager().placeApplication(
- submissionContext, user);
- }
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
new file mode 100644
index 0000000000..f2f92b81fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+/**
+ * Each placement rule when it successfully places an application onto a queue
+ * returns a PlacementRuleContext which encapsulates the queue the
+ * application was mapped to and any parent queue for the queue (if configured)
+ */
+public class ApplicationPlacementContext {
+
+ private String queue;
+
+ private String parentQueue;
+
+ public ApplicationPlacementContext(String queue) {
+ this(queue,null);
+ }
+
+ public ApplicationPlacementContext(String queue, String parentQueue) {
+ this.queue = queue;
+ this.parentQueue = parentQueue;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public String getParentQueue() {
+ return parentQueue;
+ }
+
+ public boolean hasParentQueue() {
+ return parentQueue != null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
index 43a4deb70e..c0067385b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -23,7 +23,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -53,36 +52,33 @@ public void updateRules(List
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Auto Creation enabled Parent queue. This queue initially does not have any + * children to start with and all child + * leaf queues will be auto created. Currently this does not allow other + * pre-configured leaf or parent queues to + * co-exist along with auto-created leaf queues. The auto creation is limited + * to leaf queues currently. + */ +public class ManagedParentQueue extends AbstractManagedParentQueue { + + private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false; + + private static final Logger LOG = LoggerFactory.getLogger( + ManagedParentQueue.class); + + public ManagedParentQueue(final CapacitySchedulerContext cs, + final String queueName, final CSQueue parent, final CSQueue old) + throws IOException { + super(cs, queueName, parent, old); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + this.leafQueueTemplate = initializeLeafQueueConfigs( + leafQueueTemplateConfPrefix).build(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Managed Parent Queue: ").append(queueName).append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + this.leafQueueTemplate = initializeLeafQueueConfigs( + leafQueueTemplateConfPrefix).build(); + } + + @Override + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs( + String queuePath) { + + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = + super.initializeLeafQueueConfigs(queuePath); + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf); + QueueCapacities queueCapacities = new QueueCapacities(false); + CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix, + csContext.getConfiguration(), queueCapacities, getQueueCapacities()); + leafQueueTemplate.capacities(queueCapacities); + + shouldFailAutoCreationWhenGuaranteedCapacityExceeded = + conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath()); + + return leafQueueTemplate; + } + + protected void validate(final CSQueue newlyParsedQueue) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + } + + @Override + public void addChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + + if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Expected child queue to be an instance of AutoCreatedLeafQueue"); + } + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + ManagedParentQueue parentQueue = + (ManagedParentQueue) childQueue.getParent(); + + String leafQueueName = childQueue.getQueueName(); + int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit( + parentQueue.getQueuePath()); + + if (parentQueue.getChildQueues().size() >= maxQueues) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ".Max Child " + + "Queue limit exceeded which is configured as : " + maxQueues + + " and number of child queues is : " + parentQueue + .getChildQueues().size()); + } + + if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) { + if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity() + + parentQueue.sumOfChildAbsCapacities() > parentQueue + .getAbsoluteCapacity()) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ". Child " + + "queues capacities have reached parent queue : " + + parentQueue.getQueuePath() + " guaranteed capacity"); + } + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + super.addChildQueue(leafQueue); + //TODO - refresh policy queue after capacity management is added + + } finally { + writeLock.unlock(); + } + } + + private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { + return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index d61951bb8f..959ca51eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1081,17 +1081,4 @@ public void stopQueue() { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } - - protected float sumOfChildCapacities() { - try { - writeLock.lock(); - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); - } - return ret; - } finally { - writeLock.unlock(); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 4ab2e9f14d..b7f8aa6996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -40,6 +40,19 @@ public class PlanQueue extends AbstractManagedParentQueue { public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); + this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Plan Queue: ").append(queueName).append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); } @Override @@ -47,17 +60,21 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { validate(newlyParsedQueue); super.reinitialize(newlyParsedQueue, clusterResource); + this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); } @Override - protected void initializeLeafQueueConfigs() { - String queuePath = super.getQueuePath(); + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + (String queuePath) { + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super + .initializeLeafQueueConfigs + (queuePath); showReservationsAsQueues = csContext.getConfiguration() .getShowReservationAsQueues(queuePath); - super.initializeLeafQueueConfigs(); + return leafQueueTemplate; } - private void validate(final CSQueue newlyParsedQueue) throws IOException { + protected void validate(final CSQueue newlyParsedQueue) throws IOException { // Sanity check if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index 0a8d6fe529..80b7f2fb82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.placement
+ .ApplicationPlacementContext;
public class AppAddedSchedulerEvent extends SchedulerEvent {
@@ -31,15 +33,23 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
private final ReservationId reservationID;
private final boolean isAppRecovering;
private final Priority appPriority;
+ private final ApplicationPlacementContext placementContext;
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user) {
- this(applicationId, queue, user, false, null, Priority.newInstance(0));
+ this(applicationId, queue, user, false, null, Priority.newInstance(0),
+ null);
+ }
+
+ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+ String user, ApplicationPlacementContext placementContext) {
+ this(applicationId, queue, user, false, null, Priority.newInstance(0),
+ placementContext);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, ReservationId reservationID, Priority appPriority) {
- this(applicationId, queue, user, false, reservationID, appPriority);
+ this(applicationId, queue, user, false, reservationID, appPriority, null);
}
public AppAddedSchedulerEvent(String user,
@@ -47,12 +57,20 @@ public AppAddedSchedulerEvent(String user,
Priority appPriority) {
this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(),
- appPriority);
+ appPriority, null);
+ }
+
+ public AppAddedSchedulerEvent(String user,
+ ApplicationSubmissionContext submissionContext, boolean isAppRecovering,
+ Priority appPriority, ApplicationPlacementContext placementContext) {
+ this(submissionContext.getApplicationId(), submissionContext.getQueue(),
+ user, isAppRecovering, submissionContext.getReservationID(),
+ appPriority, placementContext);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering, ReservationId reservationID,
- Priority appPriority) {
+ Priority appPriority, ApplicationPlacementContext placementContext) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
@@ -60,6 +78,7 @@ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering;
this.appPriority = appPriority;
+ this.placementContext = placementContext;
}
public ApplicationId getApplicationId() {
@@ -85,4 +104,8 @@ public ReservationId getReservationID() {
public Priority getApplicatonPriority() {
return appPriority;
}
+
+ public ApplicationPlacementContext getPlacementContext() {
+ return placementContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 81793217c4..9445fa6efd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -44,7 +44,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.MockApps;
@@ -70,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -84,7 +84,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -856,28 +855,32 @@ public void testEscapeApplicationSummary() {
Assert.assertTrue(msg.contains("preemptedResources=
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for creation and reinitilization of auto created leaf queues
+ * under a ManagedParentQueue.
+ */
+public class TestCapacitySchedulerAutoQueueCreation {
+
+ private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+ private final int GB = 1024;
+ private final static ContainerUpdates NULL_UPDATE_REQUESTS =
+ new ContainerUpdates();
+
+ private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ private static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ private static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
+ private static final String A1 = A + ".a1";
+ private static final String A2 = A + ".a2";
+ private static final String B1 = B + ".b1";
+ private static final String B2 = B + ".b2";
+ private static final String B3 = B + ".b3";
+ private static final String C1 = C + ".c1";
+ private static final String C2 = C + ".c2";
+ private static final String C3 = C + ".c3";
+ private static float A_CAPACITY = 20f;
+ private static float B_CAPACITY = 40f;
+ private static float C_CAPACITY = 20f;
+ private static float D_CAPACITY = 20f;
+ private static float A1_CAPACITY = 30;
+ private static float A2_CAPACITY = 70;
+ private static float B1_CAPACITY = 60f;
+ private static float B2_CAPACITY = 20f;
+ private static float B3_CAPACITY = 20f;
+ private static float C1_CAPACITY = 20f;
+ private static float C2_CAPACITY = 20f;
+
+ private static String USER = "user_";
+ private static String USER0 = USER + 0;
+ private static String USER2 = USER + 2;
+ private static String PARENT_QUEUE = "c";
+
+ private MockRM mockRM = null;
+
+ private CapacityScheduler cs;
+
+ private final TestCapacityScheduler tcs = new TestCapacityScheduler();
+
+ private static SpyDispatcher dispatcher;
+
+ private static EventHandler
+ * root
+ * / \ \ \
+ * a b c d
+ * / \ / | \
+ * a1 a2 b1 b2 b3
+ */
+ private CapacitySchedulerConfiguration setupQueueConfiguration(
+ CapacitySchedulerConfiguration conf) {
+
+ //setup new queues with one of them auto enabled
+ // Define top-level queues
+ // Set childQueue for root
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] { "a", "b", "c", "d" });
+
+ conf.setCapacity(A, A_CAPACITY);
+ conf.setCapacity(B, B_CAPACITY);
+ conf.setCapacity(C, C_CAPACITY);
+ conf.setCapacity(D, D_CAPACITY);
+
+ // Define 2nd-level queues
+ conf.setQueues(A, new String[] { "a1", "a2" });
+ conf.setCapacity(A1, A1_CAPACITY);
+ conf.setUserLimitFactor(A1, 100.0f);
+ conf.setCapacity(A2, A2_CAPACITY);
+ conf.setUserLimitFactor(A2, 100.0f);
+
+ conf.setQueues(B, new String[] { "b1", "b2", "b3" });
+ conf.setCapacity(B1, B1_CAPACITY);
+ conf.setUserLimitFactor(B1, 100.0f);
+ conf.setCapacity(B2, B2_CAPACITY);
+ conf.setUserLimitFactor(B2, 100.0f);
+ conf.setCapacity(B3, B3_CAPACITY);
+ conf.setUserLimitFactor(B3, 100.0f);
+
+ conf.setUserLimitFactor(C, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(C, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
+
+ LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
+
+ conf.setUserLimitFactor(D, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(D, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f);
+
+ LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
+
+ return conf;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mockRM != null) {
+ mockRM.stop();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testAutoCreateLeafQueueCreation() throws Exception {
+
+ try {
+ // submit an app
+ submitApp(cs, USER0, USER0, PARENT_QUEUE);
+
+ // check preconditions
+ List