YARN-4140. RM container allocation delayed incase of app submitted to Nodelabel partition. (Bibin A Chundatt via wangda)
This commit is contained in:
parent
4f6e842ba9
commit
def374e666
@ -918,6 +918,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-4235. FairScheduler PrimaryGroup does not handle empty groups returned
|
||||
for a user. (Anubhav Dhoot via rohithsharmaks)
|
||||
|
||||
YARN-4140. RM container allocation delayed incase of app submitted to
|
||||
Nodelabel partition. (Bibin A Chundatt via wangda)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -308,6 +308,18 @@ public static ResourceRequest newResourceRequest(Priority priority,
|
||||
return request;
|
||||
}
|
||||
|
||||
public static ResourceRequest newResourceRequest(Priority priority,
|
||||
String hostName, Resource capability, int numContainers, String label) {
|
||||
ResourceRequest request =
|
||||
recordFactory.newRecordInstance(ResourceRequest.class);
|
||||
request.setPriority(priority);
|
||||
request.setResourceName(hostName);
|
||||
request.setCapability(capability);
|
||||
request.setNumContainers(numContainers);
|
||||
request.setNodeLabelExpression(label);
|
||||
return request;
|
||||
}
|
||||
|
||||
public static ResourceRequest newResourceRequest(ResourceRequest r) {
|
||||
ResourceRequest request = recordFactory
|
||||
.newRecordInstance(ResourceRequest.class);
|
||||
|
@ -332,6 +332,30 @@ synchronized public boolean updateResourceRequests(
|
||||
if (request.getNumContainers() > 0) {
|
||||
activeUsersManager.activateApplication(user, applicationId);
|
||||
}
|
||||
ResourceRequest previousAnyRequest =
|
||||
getResourceRequest(priority, resourceName);
|
||||
|
||||
// When there is change in ANY request label expression, we should
|
||||
// update label for all resource requests already added of same
|
||||
// priority as ANY resource request.
|
||||
if ((null == previousAnyRequest)
|
||||
|| isRequestLabelChanged(previousAnyRequest, request)) {
|
||||
Map<String, ResourceRequest> resourceRequest =
|
||||
getResourceRequests(priority);
|
||||
if (resourceRequest != null) {
|
||||
for (ResourceRequest r : resourceRequest.values()) {
|
||||
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
|
||||
r.setNodeLabelExpression(request.getNodeLabelExpression());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ResourceRequest anyRequest =
|
||||
getResourceRequest(priority, ResourceRequest.ANY);
|
||||
if (anyRequest != null) {
|
||||
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
|
||||
@ -369,12 +393,13 @@ synchronized public boolean updateResourceRequests(
|
||||
lastRequestCapability);
|
||||
|
||||
// update queue:
|
||||
Resource increasedResource = Resources.multiply(request.getCapability(),
|
||||
request.getNumContainers());
|
||||
queue.incPendingResource(
|
||||
request.getNodeLabelExpression(),
|
||||
Resource increasedResource =
|
||||
Resources.multiply(request.getCapability(),
|
||||
request.getNumContainers());
|
||||
queue.incPendingResource(request.getNodeLabelExpression(),
|
||||
increasedResource);
|
||||
appResourceUsage.incPending(request.getNodeLabelExpression(),
|
||||
increasedResource);
|
||||
appResourceUsage.incPending(request.getNodeLabelExpression(), increasedResource);
|
||||
if (lastRequest != null) {
|
||||
Resource decreasedResource =
|
||||
Resources.multiply(lastRequestCapability, lastRequestContainers);
|
||||
@ -388,6 +413,13 @@ synchronized public boolean updateResourceRequests(
|
||||
return anyResourcesUpdated;
|
||||
}
|
||||
|
||||
private boolean isRequestLabelChanged(ResourceRequest requestOne,
|
||||
ResourceRequest requestTwo) {
|
||||
String requestOneLabelExp = requestOne.getNodeLabelExpression();
|
||||
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
|
||||
return (!(requestOneLabelExp.equals(requestTwoLabelExp)));
|
||||
}
|
||||
|
||||
/**
|
||||
* The ApplicationMaster is updating the userBlacklist used for containers
|
||||
* other than AMs.
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -81,18 +82,20 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
|
||||
|
||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
conf.setCapacity(A, 10);
|
||||
conf.setMaximumCapacity(A, 15);
|
||||
conf.setAccessibleNodeLabels(A, toSet("x"));
|
||||
conf.setCapacityByLabel(A, "x", 100);
|
||||
|
||||
|
||||
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
conf.setCapacity(B, 20);
|
||||
conf.setAccessibleNodeLabels(B, toSet("y"));
|
||||
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
|
||||
conf.setCapacityByLabel(B, "y", 100);
|
||||
|
||||
conf.setCapacityByLabel(B, "z", 100);
|
||||
|
||||
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
conf.setCapacity(C, 70);
|
||||
conf.setMaximumCapacity(C, 70);
|
||||
@ -110,6 +113,7 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
||||
conf.setCapacity(B1, 100);
|
||||
conf.setMaximumCapacity(B1, 100);
|
||||
conf.setCapacityByLabel(B1, "y", 100);
|
||||
conf.setCapacityByLabel(B1, "z", 100);
|
||||
|
||||
final String C1 = C + ".c1";
|
||||
conf.setQueues(C, new String[] {"c1"});
|
||||
@ -474,7 +478,111 @@ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
|
||||
SchedulerNode node = cs.getSchedulerNode(nodeId);
|
||||
Assert.assertEquals(numContainers, node.getNumContainers());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* JIRA YARN-4140, In Resource request set node label will be set only on ANY
|
||||
* reqest. RACK/NODE local and default requests label expression need to be
|
||||
* updated. This testcase is to verify the label expression is getting changed
|
||||
* based on ANY requests.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testResourceRequestUpdateNodePartitions() throws Exception {
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabels(ImmutableSet.of(NodeLabel.newInstance("x"),
|
||||
NodeLabel.newInstance("y", false), NodeLabel.newInstance("z", false)));
|
||||
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
|
||||
// inject node label manager
|
||||
MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // no label
|
||||
MockNM nm2 = rm1.registerNode("h2:1234", 40 * GB); // label = y
|
||||
// launch an app to queue b1 (label = y), AM container should be launched in
|
||||
// nm2
|
||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
||||
// Creating request set when request before ANY is not having label and any
|
||||
// is having label
|
||||
List<ResourceRequest> resourceRequest = new ArrayList<ResourceRequest>();
|
||||
resourceRequest.add(am1.createResourceReq("/default-rack", 1024, 3, 1,
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
resourceRequest.add(am1.createResourceReq("*", 1024, 3, 5, "y"));
|
||||
resourceRequest.add(am1.createResourceReq("h1:1234", 1024, 3, 2,
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
resourceRequest.add(am1.createResourceReq("*", 1024, 2, 3, "y"));
|
||||
resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
|
||||
resourceRequest.add(am1.createResourceReq("*", 1024, 4, 3, null));
|
||||
resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 4, 4, null));
|
||||
am1.allocate(resourceRequest, new ArrayList<ContainerId>());
|
||||
CapacityScheduler cs =
|
||||
(CapacityScheduler) rm1.getRMContext().getScheduler();
|
||||
FiCaSchedulerApp app =
|
||||
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||
List<ResourceRequest> allResourceRequests =
|
||||
app.getAppSchedulingInfo().getAllResourceRequests();
|
||||
for (ResourceRequest changeReq : allResourceRequests) {
|
||||
if (changeReq.getPriority().getPriority() == 2
|
||||
|| changeReq.getPriority().getPriority() == 3) {
|
||||
Assert.assertEquals("Expected label y", "y",
|
||||
changeReq.getNodeLabelExpression());
|
||||
} else if (changeReq.getPriority().getPriority() == 4) {
|
||||
Assert.assertEquals("Expected label EMPTY",
|
||||
RMNodeLabelsManager.NO_LABEL, changeReq.getNodeLabelExpression());
|
||||
}
|
||||
}
|
||||
|
||||
// Previous any request was Y trying to update with z and the
|
||||
// request before ANY label is null
|
||||
List<ResourceRequest> newReq = new ArrayList<ResourceRequest>();
|
||||
newReq.add(am1.createResourceReq("h2:1234", 1024, 3, 4, null));
|
||||
newReq.add(am1.createResourceReq("*", 1024, 3, 5, "z"));
|
||||
newReq.add(am1.createResourceReq("h1:1234", 1024, 3, 4, null));
|
||||
newReq.add(am1.createResourceReq("*", 1024, 4, 5, "z"));
|
||||
am1.allocate(newReq, new ArrayList<ContainerId>());
|
||||
allResourceRequests = app.getAppSchedulingInfo().getAllResourceRequests();
|
||||
for (ResourceRequest changeReq : allResourceRequests) {
|
||||
if (changeReq.getPriority().getPriority() == 3
|
||||
|| changeReq.getPriority().getPriority() == 4) {
|
||||
Assert.assertEquals("Expected label z", "z",
|
||||
changeReq.getNodeLabelExpression());
|
||||
} else if (changeReq.getPriority().getPriority() == 2) {
|
||||
Assert.assertEquals("Expected label y", "y",
|
||||
changeReq.getNodeLabelExpression());
|
||||
}
|
||||
}
|
||||
// Request before ANY and ANY request is set as NULL. Request should be set
|
||||
// with Empty Label
|
||||
List<ResourceRequest> resourceRequest1 = new ArrayList<ResourceRequest>();
|
||||
resourceRequest1.add(am1.createResourceReq("/default-rack", 1024, 3, 1,
|
||||
null));
|
||||
resourceRequest1.add(am1.createResourceReq("*", 1024, 3, 5, null));
|
||||
resourceRequest1.add(am1.createResourceReq("h1:1234", 1024, 3, 2,
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
resourceRequest1.add(am1.createResourceReq("/default-rack", 1024, 2, 1,
|
||||
null));
|
||||
resourceRequest1.add(am1.createResourceReq("*", 1024, 2, 3,
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
resourceRequest1.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
|
||||
am1.allocate(resourceRequest1, new ArrayList<ContainerId>());
|
||||
allResourceRequests = app.getAppSchedulingInfo().getAllResourceRequests();
|
||||
for (ResourceRequest changeReq : allResourceRequests) {
|
||||
if (changeReq.getPriority().getPriority() == 3) {
|
||||
Assert.assertEquals("Expected label Empty",
|
||||
RMNodeLabelsManager.NO_LABEL, changeReq.getNodeLabelExpression());
|
||||
} else if (changeReq.getPriority().getPriority() == 2) {
|
||||
Assert.assertEquals("Expected label y", RMNodeLabelsManager.NO_LABEL,
|
||||
changeReq.getNodeLabelExpression());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
|
||||
/**
|
||||
|
@ -38,6 +38,7 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
@ -108,6 +109,7 @@ protected ResourceRequest createResourceRequest(
|
||||
prio.setPriority(priority);
|
||||
request.setPriority(prio);
|
||||
request.setRelaxLocality(relaxLocality);
|
||||
request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||
return request;
|
||||
}
|
||||
|
||||
|
@ -70,6 +70,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
@ -112,7 +113,6 @@ public class TestFifoScheduler {
|
||||
|
||||
private ResourceManager resourceManager = null;
|
||||
private static Configuration conf;
|
||||
|
||||
private static final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
@ -937,9 +937,11 @@ public void testBlackListNodes() throws Exception {
|
||||
// Ask for a 1 GB container for app 1
|
||||
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
|
||||
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||
"rack1", BuilderUtils.newResource(GB, 1), 1));
|
||||
"rack1", BuilderUtils.newResource(GB, 1), 1,
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
|
||||
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1,
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
fs.allocate(appAttemptId1, ask1, emptyId,
|
||||
Collections.singletonList(host_1_0), null, null, null);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user