YARN-4617. LeafQueue#pendingOrderingPolicy should always use fixed ordering policy instead of using same as active applications ordering policy. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2016-01-29 12:22:06 -08:00
parent eddd823cd6
commit f4a57d4a53
10 changed files with 244 additions and 73 deletions

View File

@ -1361,6 +1361,10 @@ Release 2.8.0 - UNRELEASED
YARN-4643. Container recovery is broken with delegating container runtime YARN-4643. Container recovery is broken with delegating container runtime
(Sidharta Seethana via jlowe) (Sidharta Seethana via jlowe)
YARN-4617. LeafQueue#pendingOrderingPolicy should always use fixed ordering
policy instead of using same as active applications ordering policy.
(Rohith Sharma K S via jianhe)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -162,6 +162,10 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.PriorityComparator" /> <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.PriorityComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match> <Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" /> <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />

View File

@ -968,7 +968,8 @@ protected void getActivedAppDiagnosticMessage(
// queue's resource usage for specific partition // queue's resource usage for specific partition
} }
public boolean isAttemptRecovering() { @Override
public boolean isRecovering() {
return isAttemptRecovering; return isAttemptRecovering;
} }

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@ -97,9 +98,6 @@ public class LeafQueue extends AbstractCSQueue {
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null; private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
// Always give preference to this while activating the application attempts.
private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
private volatile float minimumAllocationFactor; private volatile float minimumAllocationFactor;
private Map<String, User> users = new HashMap<String, User>(); private Map<String, User> users = new HashMap<String, User>();
@ -126,6 +124,7 @@ public class LeafQueue extends AbstractCSQueue {
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers = private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new HashMap<>(); new HashMap<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs, public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException { String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old); super(cs, queueName, parent, old);
@ -133,6 +132,9 @@ public LeafQueue(CapacitySchedulerContext cs,
this.activeUsersManager = new ActiveUsersManager(metrics); this.activeUsersManager = new ActiveUsersManager(metrics);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath()); + ", fullname=" + getQueuePath());
@ -159,10 +161,6 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
CapacitySchedulerConfiguration conf = csContext.getConfiguration(); CapacitySchedulerConfiguration conf = csContext.getConfiguration();
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicy(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicyRecovery(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath()); userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@ -327,8 +325,7 @@ public synchronized int getNumApplications() {
} }
public synchronized int getNumPendingApplications() { public synchronized int getNumPendingApplications() {
return pendingOrderingPolicy.getNumSchedulableEntities() return pendingOrderingPolicy.getNumSchedulableEntities();
+ pendingOPForRecoveredApps.getNumSchedulableEntities();
} }
public synchronized int getNumActiveApplications() { public synchronized int getNumActiveApplications() {
@ -627,18 +624,9 @@ private synchronized void activateApplications() {
calculateAndGetAMResourceLimitPerPartition(nodePartition); calculateAndGetAMResourceLimitPerPartition(nodePartition);
} }
activateApplications(getPendingAppsOrderingPolicyRecovery() for (Iterator<FiCaSchedulerApp> fsApp =
.getAssignmentIterator(), userAmPartitionLimit); getPendingAppsOrderingPolicy().getAssignmentIterator();
fsApp.hasNext();) {
activateApplications(
getPendingAppsOrderingPolicy().getAssignmentIterator(),
userAmPartitionLimit);
}
private synchronized void activateApplications(
Iterator<FiCaSchedulerApp> fsApp,
Map<String, Resource> userAmPartitionLimit) {
while (fsApp.hasNext()) {
FiCaSchedulerApp application = fsApp.next(); FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId(); ApplicationId applicationId = application.getApplicationId();
@ -740,11 +728,7 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
User user) { User user) {
// Accept // Accept
user.submitApplication(); user.submitApplication();
if (application.isAttemptRecovering()) { getPendingAppsOrderingPolicy().addSchedulableEntity(application);
getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
} else {
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
}
applicationAttemptMap.put(application.getApplicationAttemptId(), application); applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications // Activate applications
@ -784,11 +768,7 @@ public synchronized void removeApplicationAttempt(
boolean wasActive = boolean wasActive =
orderingPolicy.removeSchedulableEntity(application); orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) { if (!wasActive) {
if (application.isAttemptRecovering()) { pendingOrderingPolicy.removeSchedulableEntity(application);
pendingOPForRecoveredApps.removeSchedulableEntity(application);
} else {
pendingOrderingPolicy.removeSchedulableEntity(application);
}
} else { } else {
queueUsage.decAMUsed(partitionName, queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName)); application.getAMResource(partitionName));
@ -1539,18 +1519,16 @@ public void recoverContainer(Resource clusterResource,
* Obtain (read-only) collection of pending applications. * Obtain (read-only) collection of pending applications.
*/ */
public Collection<FiCaSchedulerApp> getPendingApplications() { public Collection<FiCaSchedulerApp> getPendingApplications() {
Collection<FiCaSchedulerApp> pendingApps = return Collections.unmodifiableCollection(pendingOrderingPolicy
new ArrayList<FiCaSchedulerApp>(); .getSchedulableEntities());
pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
return pendingApps;
} }
/** /**
* Obtain (read-only) collection of active applications. * Obtain (read-only) collection of active applications.
*/ */
public Collection<FiCaSchedulerApp> getApplications() { public Collection<FiCaSchedulerApp> getApplications() {
return orderingPolicy.getSchedulableEntities(); return Collections.unmodifiableCollection(orderingPolicy
.getSchedulableEntities());
} }
// Consider the headroom for each user in the queue. // Consider the headroom for each user in the queue.
@ -1587,10 +1565,6 @@ public synchronized Resource getTotalPendingResourcesConsideringUserLimit(
@Override @Override
public synchronized void collectSchedulerApplications( public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) { Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) { .getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId()); apps.add(pendingApp.getApplicationAttemptId());
@ -1759,30 +1733,6 @@ public void decreaseContainer(Resource clusterResource,
getPendingAppsOrderingPolicy() { getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy; return pendingOrderingPolicy;
} }
public synchronized void setPendingAppsOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy) {
if (null != this.pendingOrderingPolicy) {
pendingOrderingPolicy
.addAllSchedulableEntities(this.pendingOrderingPolicy
.getSchedulableEntities());
}
this.pendingOrderingPolicy = pendingOrderingPolicy;
}
public synchronized OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicyRecovery() {
return pendingOPForRecoveredApps;
}
public synchronized void setPendingAppsOrderingPolicyRecovery(
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
if (null != this.pendingOPForRecoveredApps) {
pendingOrderingPolicyRecovery
.addAllSchedulableEntities(this.pendingOPForRecoveredApps
.getSchedulableEntities());
}
this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
}
/* /*
* Holds shared values used by all applications in * Holds shared values used by all applications in

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* This ordering policy is used for pending applications only.
* An OrderingPolicy which orders SchedulableEntities by
* <ul>
* <li>Recovering application
* <li>Priority of an application
* <li>Input order
* </ul>
* <p>
* Example : If schedulableEntities with E1(true,1,1) E2(true,2,2) E3(true,3,3)
* E4(false,4,4) E5(false,4,5) are added. The ordering policy assignment
* iterator is in the order of E3(true,3,3) E2(true,2,2) E1(true,1,1)
* E5(false,5,5) E4(false,4,4)
*/
public class FifoOrderingPolicyForPendingApps<S extends SchedulableEntity>
extends AbstractComparatorOrderingPolicy<S> {
public FifoOrderingPolicyForPendingApps() {
List<Comparator<SchedulableEntity>> comparators =
new ArrayList<Comparator<SchedulableEntity>>();
comparators.add(new RecoveryComparator());
comparators.add(new PriorityComparator());
comparators.add(new FifoComparator());
this.comparator = new CompoundComparator(comparators);
this.schedulableEntities = new TreeSet<S>(comparator);
}
@Override
public String getInfo() {
return "FifoOrderingPolicyForPendingApps";
}
@Override
public void configure(Map<String, String> conf) {
}
@Override
public void containerAllocated(S schedulableEntity, RMContainer r) {
}
@Override
public void containerReleased(S schedulableEntity, RMContainer r) {
}
@Override
public void demandUpdated(S schedulableEntity) {
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.Comparator;
/**
* A Comparator which orders SchedulableEntities by isRecovering flag.
*/
public class RecoveryComparator implements Comparator<SchedulableEntity> {
@Override
public int compare(SchedulableEntity se1, SchedulableEntity se2) {
int val1 = se1.isRecovering() ? 1 : 0;
int val2 = se2.isRecovering() ? 1 : 0;
return val2 - val1;
}
}

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
/** /**
* A SchedulableEntity is a process to be scheduled, * A SchedulableEntity is a process to be scheduled.
* for example, an application / application attempt * for example, an application / application attempt
*/ */
public interface SchedulableEntity { public interface SchedulableEntity {
@ -53,4 +50,9 @@ public interface SchedulableEntity {
*/ */
public Priority getPriority(); public Priority getPriority();
/**
* Whether application was running before RM restart.
*/
public boolean isRecovering();
} }

View File

@ -2399,7 +2399,6 @@ public void testFifoAssignment() throws Exception {
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
String host_0_0 = "127.0.0.1"; String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0"; String rack_0 = "rack_0";
@ -2549,7 +2548,6 @@ public void testFairAssignment() throws Exception {
new FairOrderingPolicy<FiCaSchedulerApp>(); new FairOrderingPolicy<FiCaSchedulerApp>();
a.setOrderingPolicy(schedulingOrder); a.setOrderingPolicy(schedulingOrder);
a.setPendingAppsOrderingPolicy(new FairOrderingPolicy<FiCaSchedulerApp>());
String host_0_0 = "127.0.0.1"; String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0"; String rack_0 = "rack_0";

View File

@ -32,9 +32,17 @@ public class MockSchedulableEntity implements SchedulableEntity {
private String id; private String id;
private long serial = 0; private long serial = 0;
private Priority priority; private Priority priority;
private boolean isRecovering;
public MockSchedulableEntity() { } public MockSchedulableEntity() { }
public MockSchedulableEntity(long serial, int priority,
boolean isRecovering) {
this.serial = serial;
this.priority = Priority.newInstance(priority);
this.isRecovering = isRecovering;
}
public void setId(String id) { public void setId(String id) {
this.id = id; this.id = id;
} }
@ -84,4 +92,13 @@ public Priority getPriority() {
public void setApplicationPriority(Priority priority) { public void setApplicationPriority(Priority priority) {
this.priority = priority; this.priority = priority;
} }
@Override
public boolean isRecovering() {
return isRecovering;
}
protected void setRecovering(boolean entityRecovering) {
this.isRecovering = entityRecovering;
}
} }

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.junit.Assert;
import org.junit.Test;
public class TestFifoOrderingPolicyForPendingApps {
@Test
public void testFifoOrderingPolicyForPendingApps() {
FifoOrderingPolicyForPendingApps<MockSchedulableEntity> policy =
new FifoOrderingPolicyForPendingApps<MockSchedulableEntity>();
MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity();
Assert.assertEquals(policy.getComparator().compare(r1, r2), 0);
r1.setSerial(1);
r1.setRecovering(true);
Assert.assertEquals(policy.getComparator().compare(r1, r2), -1);
r1.setRecovering(false);
r2.setSerial(2);
r2.setRecovering(true);
Assert.assertEquals(policy.getComparator().compare(r1, r2), 1);
}
/**
* Entities submitted with E1-Recovering, E2-Recovering, E3-Recovering, E4-not
* recovering, E5-not recovering.
* Expected Iterator Output : E-3 E-2 E-1 E-5 E-4
*/
@Test
public void testIterators() {
OrderingPolicy<MockSchedulableEntity> schedOrder =
new FifoOrderingPolicyForPendingApps<MockSchedulableEntity>();
MockSchedulableEntity msp1 = new MockSchedulableEntity(1, 1, true);
MockSchedulableEntity msp2 = new MockSchedulableEntity(2, 2, true);
MockSchedulableEntity msp3 = new MockSchedulableEntity(3, 3, true);
MockSchedulableEntity msp4 = new MockSchedulableEntity(4, 2, true);
MockSchedulableEntity msp5 = new MockSchedulableEntity(5, 5, false);
MockSchedulableEntity msp6 = new MockSchedulableEntity(6, 6, false);
MockSchedulableEntity msp7 = new MockSchedulableEntity(7, 5, false);
schedOrder.addSchedulableEntity(msp1);
schedOrder.addSchedulableEntity(msp2);
schedOrder.addSchedulableEntity(msp3);
schedOrder.addSchedulableEntity(msp4);
schedOrder.addSchedulableEntity(msp5);
schedOrder.addSchedulableEntity(msp6);
schedOrder.addSchedulableEntity(msp7);
// Assignment with serial id's are 3,2,4,1,6,5,7
checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1,
6, 5, 7 });
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1,
4, 2, 3 });
}
public void checkSerials(Iterator<MockSchedulableEntity> si,
long[] serials) {
for (int i = 0; i < serials.length; i++) {
Assert.assertEquals(si.next().getSerial(), serials[i]);
}
}
}