YARN-4565. Fix a bug that leads to AM resource limit not hornored when sizeBasedWeight enabled for FairOrderingPolicy. Contributed by Wangda Tan
This commit is contained in:
parent
92c5f565fd
commit
edc43a9097
@ -1280,6 +1280,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-4502. Fix two AM containers get allocated when AM restart.
|
||||
(Vinod Kumar Vavilapalli via wangda)
|
||||
|
||||
YARN-4565. Fix a bug that leads to AM resource limit not hornored when
|
||||
sizeBasedWeight enabled for FairOrderingPolicy. (wtan via jianhe)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -863,7 +863,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ResourceUsage getSchedulingResourceUsage() {
|
||||
public ResourceUsage getSchedulingResourceUsage() {
|
||||
return attemptResourceUsage;
|
||||
}
|
||||
|
||||
|
@ -978,7 +978,8 @@ public class CapacityScheduler extends
|
||||
clusterResource, getMinimumResourceCapability());
|
||||
}
|
||||
|
||||
if (updateDemandForQueue != null) {
|
||||
if (updateDemandForQueue != null && !application
|
||||
.isWaitingForAMContainer()) {
|
||||
updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -959,4 +960,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||
DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
|
||||
return defaultPriority;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setOrderingPolicy(String queue, String policy) {
|
||||
set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setOrderingPolicyParameter(String queue,
|
||||
String parameterKey, String parameterValue) {
|
||||
set(getQueuePrefix(queue) + ORDERING_POLICY + "."
|
||||
+ parameterKey, parameterValue);
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -138,6 +150,48 @@ public class TestFairOrderingPolicy {
|
||||
checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSizeBasedWeightNotAffectAppActivation() throws Exception {
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
|
||||
// Define top-level queues
|
||||
String queuePath = CapacitySchedulerConfiguration.ROOT + ".default";
|
||||
csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
|
||||
csConf.setOrderingPolicyParameter(queuePath,
|
||||
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true");
|
||||
csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm = new MockRM(csConf);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
// Get LeafQueue
|
||||
LeafQueue lq = (LeafQueue) cs.getQueue("default");
|
||||
OrderingPolicy<FiCaSchedulerApp> policy = lq.getOrderingPolicy();
|
||||
Assert.assertTrue(policy instanceof FairOrderingPolicy);
|
||||
Assert.assertTrue(((FairOrderingPolicy<FiCaSchedulerApp>)policy).getSizeBasedWeight());
|
||||
|
||||
rm.registerNode("h1:1234", 10 * GB);
|
||||
|
||||
// Submit 4 apps
|
||||
rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
|
||||
Assert.assertEquals(1, lq.getNumActiveApplications());
|
||||
Assert.assertEquals(3, lq.getNumPendingApplications());
|
||||
|
||||
// Try allocate once, #active-apps and #pending-apps should be still correct
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(NodeId.newInstance("h1", 1234))));
|
||||
Assert.assertEquals(1, lq.getNumActiveApplications());
|
||||
Assert.assertEquals(3, lq.getNumPendingApplications());
|
||||
}
|
||||
|
||||
public void checkIds(Iterator<MockSchedulableEntity> si,
|
||||
String[] ids) {
|
||||
for (int i = 0;i < ids.length;i++) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user