YARN-6439. Fix ReservationSystem creation of default ReservationQueue. (Carlo Curino via wangda)

This commit is contained in:
Wangda Tan 2017-04-11 14:56:18 -07:00
parent 7d873c465b
commit 4d4ad0ebb7
6 changed files with 52 additions and 3 deletions

View File

@ -41,10 +41,13 @@
import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -220,6 +223,23 @@ static CSQueue parseQueue(
queue = queue =
new PlanQueue(csContext, queueName, parent, new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName)); oldQueues.get(queueName));
//initializing the "internal" default queue, for SLS compatibility
String defReservationId =
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>();
ReservationQueue resQueue = new ReservationQueue(csContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
} catch (SchedulerDynamicEditException e) {
throw new IllegalStateException(e);
}
childQueues.add(resQueue);
((PlanQueue) queue).setChildQueues(childQueues);
queues.put(defReservationId, resQueue);
} else { } else {
queue = queue =
new LeafQueue(csContext, queueName, parent, new LeafQueue(csContext, queueName, parent,

View File

@ -93,10 +93,10 @@ public void reinitialize(CSQueue newlyParsedQueue,
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
if (newlyParsedParentQueue.getChildQueues().size() > 0) { if (newlyParsedParentQueue.getChildQueues().size() != 1) {
throw new IOException( throw new IOException(
"Reservable Queue should not have sub-queues in the" "Reservable Queue should not have sub-queues in the"
+ "configuration"); + "configuration expect the default reservation queue");
} }
// Set new configs // Set new configs

View File

@ -153,6 +153,13 @@ protected void verifyCapacity(Queue defQ) {
assertTrue(csQueue.getCapacity() > 0.9); assertTrue(csQueue.getCapacity() > 0.9);
} }
@Override
protected void checkDefaultQueueBeforePlanFollowerRun(){
Queue defQ = getDefaultQueue();
Assert.assertEquals(0, getNumberOfApplications(defQ));
Assert.assertNotNull(defQ);
}
@Override @Override
protected Queue getDefaultQueue() { protected Queue getDefaultQueue() {
return cs.getQueue("dedicated" + ReservationConstants.DEFAULT_QUEUE_SUFFIX); return cs.getQueue("dedicated" + ReservationConstants.DEFAULT_QUEUE_SUFFIX);

View File

@ -131,6 +131,10 @@ public void testWithKillOnExpiry() throws PlanningException,
testPlanFollower(false); testPlanFollower(false);
} }
@Override
protected void checkDefaultQueueBeforePlanFollowerRun() {
Assert.assertNull(getDefaultQueue());
}
@Override @Override
protected void verifyCapacity(Queue defQ) { protected void verifyCapacity(Queue defQ) {
assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9); assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9);

View File

@ -89,6 +89,11 @@ protected void testPlanFollower(boolean isMove) throws PlanningException,
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
.generateAllocation(10L, 1L, f2), res, minAlloc), false)); .generateAllocation(10L, 1L, f2), res, minAlloc), false));
// default reseration queue should exist before run of PlanFollower AND have
// no apps
checkDefaultQueueBeforePlanFollowerRun();
AbstractSchedulerPlanFollower planFollower = createPlanFollower(); AbstractSchedulerPlanFollower planFollower = createPlanFollower();
when(mClock.getTime()).thenReturn(0L); when(mClock.getTime()).thenReturn(0L);
@ -108,8 +113,8 @@ protected void testPlanFollower(boolean isMove) throws PlanningException,
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false); new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
scheduler.handle(appAttemptAddedEvent); scheduler.handle(appAttemptAddedEvent);
// initial default reservation queue should have no apps
// initial default reservation queue should have no apps after first run
Queue defQ = getDefaultQueue(); Queue defQ = getDefaultQueue();
Assert.assertEquals(0, getNumberOfApplications(defQ)); Assert.assertEquals(0, getNumberOfApplications(defQ));
@ -179,6 +184,8 @@ protected void testPlanFollower(boolean isMove) throws PlanningException,
verifyCapacity(defQ); verifyCapacity(defQ);
} }
protected abstract void checkDefaultQueueBeforePlanFollowerRun();
protected abstract Queue getReservationQueue(String reservationId); protected abstract Queue getReservationQueue(String reservationId);
protected abstract void verifyCapacity(Queue defQ); protected abstract void verifyCapacity(Queue defQ);

View File

@ -76,6 +76,12 @@ public void setUp() {
public void testRefreshQueuesWithReservations() throws Exception { public void testRefreshQueuesWithReservations() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
//set default queue capacity to zero
((ReservationQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement(
new QueueEntitlement(0f, 1f));
// Test add one reservation dynamically and manually modify capacity // Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 = ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
@ -120,6 +126,11 @@ public void testAddQueueFailCases() throws Exception {
ReservationQueue a1 = ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1); cs.addQueue(a1);
//set default queue capacity to zero
((ReservationQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement(
new QueueEntitlement(0f, 1f));
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
// Test add another reservation queue and use setEntitlement to modify // Test add another reservation queue and use setEntitlement to modify