YARN-4082. Container shouldn't be killed when node's label updated. Contributed by Wangda Tan.
This commit is contained in:
parent
f4d96be6c6
commit
bf669b6d9f
@ -804,6 +804,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id
|
||||
has not been reset synchronously. (Jun Gong via rohithsharmaks)
|
||||
|
||||
YARN-4082. Container shouldn't be killed when node's label updated.
|
||||
(Wangda Tan via vvasudev)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
@ -543,6 +544,32 @@ public void decPendingResource(String nodeLabel, Resource resourceToDec) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incUsedResource(String nodeLabel, Resource resourceToInc,
|
||||
SchedulerApplicationAttempt application) {
|
||||
if (nodeLabel == null) {
|
||||
nodeLabel = RMNodeLabelsManager.NO_LABEL;
|
||||
}
|
||||
// ResourceUsage has its own lock, no addition lock needs here.
|
||||
queueUsage.incUsed(nodeLabel, resourceToInc);
|
||||
if (null != parent) {
|
||||
parent.incUsedResource(nodeLabel, resourceToInc, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decUsedResource(String nodeLabel, Resource resourceToDec,
|
||||
SchedulerApplicationAttempt application) {
|
||||
if (nodeLabel == null) {
|
||||
nodeLabel = RMNodeLabelsManager.NO_LABEL;
|
||||
}
|
||||
// ResourceUsage has its own lock, no addition lock needs here.
|
||||
queueUsage.decUsed(nodeLabel, resourceToDec);
|
||||
if (null != parent) {
|
||||
parent.decUsedResource(nodeLabel, resourceToDec, null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return if the queue has pending resource on given nodePartition and
|
||||
* schedulingMode.
|
||||
|
@ -37,6 +37,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
||||
@ -287,4 +288,29 @@ public void attachContainer(Resource clusterResource,
|
||||
* @return resourceUsage
|
||||
*/
|
||||
public ResourceUsage getQueueResourceUsage();
|
||||
|
||||
/**
|
||||
* When partition of node updated, we will update queue's resource usage if it
|
||||
* has container(s) running on that.
|
||||
*/
|
||||
public void incUsedResource(String nodePartition, Resource resourceToInc,
|
||||
SchedulerApplicationAttempt application);
|
||||
|
||||
/**
|
||||
* When partition of node updated, we will update queue's resource usage if it
|
||||
* has container(s) running on that.
|
||||
*/
|
||||
public void decUsedResource(String nodePartition, Resource resourceToDec,
|
||||
SchedulerApplicationAttempt application);
|
||||
|
||||
/**
|
||||
* When an outstanding resource is fulfilled or canceled, calling this will
|
||||
* decrease pending resource in a queue.
|
||||
*
|
||||
* @param nodeLabel
|
||||
* asked by application
|
||||
* @param resourceToDec
|
||||
* new resource asked
|
||||
*/
|
||||
public void decPendingResource(String nodeLabel, Resource resourceToDec);
|
||||
}
|
||||
|
@ -1040,12 +1040,6 @@ private synchronized void updateNodeAndQueueResource(RMNode nm,
|
||||
|
||||
/**
|
||||
* Process node labels update on a node.
|
||||
*
|
||||
* TODO: Currently capacity scheduler will kill containers on a node when
|
||||
* labels on the node changed. It is a simply solution to ensure guaranteed
|
||||
* capacity on labels of queues. When YARN-2498 completed, we can let
|
||||
* preemption policy to decide if such containers need to be killed or just
|
||||
* keep them running.
|
||||
*/
|
||||
private synchronized void updateLabelsOnNode(NodeId nodeId,
|
||||
Set<String> newLabels) {
|
||||
@ -1060,17 +1054,31 @@ private synchronized void updateLabelsOnNode(NodeId nodeId,
|
||||
return;
|
||||
}
|
||||
|
||||
// Kill running containers since label is changed
|
||||
// Get new partition, we have only one partition per node
|
||||
String newPartition;
|
||||
if (newLabels.isEmpty()) {
|
||||
newPartition = RMNodeLabelsManager.NO_LABEL;
|
||||
} else {
|
||||
newPartition = newLabels.iterator().next();
|
||||
}
|
||||
|
||||
// old partition as well
|
||||
String oldPartition = node.getPartition();
|
||||
|
||||
// Update resources of these containers
|
||||
for (RMContainer rmContainer : node.getRunningContainers()) {
|
||||
ContainerId containerId = rmContainer.getContainerId();
|
||||
completedContainer(rmContainer,
|
||||
ContainerStatus.newInstance(containerId,
|
||||
ContainerState.COMPLETE,
|
||||
String.format(
|
||||
"Container=%s killed since labels on the node=%s changed",
|
||||
containerId.toString(), nodeId.toString()),
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL);
|
||||
FiCaSchedulerApp application =
|
||||
getApplicationAttempt(rmContainer.getApplicationAttemptId());
|
||||
if (null != application) {
|
||||
application.nodePartitionUpdated(rmContainer, oldPartition,
|
||||
newPartition);
|
||||
} else {
|
||||
LOG.warn("There's something wrong, some RMContainers running on"
|
||||
+ " a node, but we cannot find SchedulerApplicationAttempt for it. Node="
|
||||
+ node.getNodeID() + " applicationAttemptId="
|
||||
+ rmContainer.getApplicationAttemptId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Unreserve container on this node
|
||||
|
@ -1262,6 +1262,22 @@ public synchronized void updateClusterResource(Resource clusterResource,
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incUsedResource(String nodeLabel, Resource resourceToInc,
|
||||
SchedulerApplicationAttempt application) {
|
||||
getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
|
||||
resourceToInc);
|
||||
super.incUsedResource(nodeLabel, resourceToInc, application);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decUsedResource(String nodeLabel, Resource resourceToDec,
|
||||
SchedulerApplicationAttempt application) {
|
||||
getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
|
||||
resourceToDec);
|
||||
super.decUsedResource(nodeLabel, resourceToDec, application);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static class User {
|
||||
ResourceUsage userResourceUsage = new ResourceUsage();
|
||||
|
@ -443,4 +443,13 @@ public CSAssignment assignContainers(Resource clusterResource,
|
||||
schedulingMode, currentResourceLimits, reservedContainer);
|
||||
}
|
||||
}
|
||||
|
||||
public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
|
||||
String newPartition) {
|
||||
Resource containerResource = rmContainer.getAllocatedResource();
|
||||
this.attemptResourceUsage.decUsed(oldPartition, containerResource);
|
||||
this.attemptResourceUsage.incUsed(newPartition, containerResource);
|
||||
getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
|
||||
getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
|
||||
}
|
||||
}
|
||||
|
@ -19,22 +19,29 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -97,8 +104,18 @@ private void checkUsedResource(MockRM rm, String queueName, int memory,
|
||||
.getMemory());
|
||||
}
|
||||
|
||||
private void checkUserUsedResource(MockRM rm, String queueName,
|
||||
String userName, String partition, int memory) {
|
||||
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
||||
LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
|
||||
LeafQueue.User user = queue.getUser(userName);
|
||||
Assert.assertEquals(memory,
|
||||
user.getResourceUsage().getUsed(partition).getMemory());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testResourceUsage() throws Exception {
|
||||
public void testRequestContainerAfterNodePartitionUpdated()
|
||||
throws Exception {
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
|
||||
"z"));
|
||||
@ -160,7 +177,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNodeUpdate() throws Exception {
|
||||
public void testResourceUsageWhenNodeUpdatesPartition()
|
||||
throws Exception {
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
|
||||
|
||||
@ -184,7 +202,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 8000);
|
||||
MockNM nm3 = rm.registerNode("h3:1234", 8000);
|
||||
|
||||
ContainerId containerId;
|
||||
ContainerId containerId1;
|
||||
ContainerId containerId2;
|
||||
|
||||
// launch an app to queue a1 (label = x), and check all container will
|
||||
// be allocated in h1
|
||||
@ -193,9 +212,9 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
|
||||
// request a container.
|
||||
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
|
||||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
|
||||
// check used resource:
|
||||
@ -203,55 +222,205 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
checkUsedResource(rm, "a", 1024, "x");
|
||||
checkUsedResource(rm, "a", 1024);
|
||||
|
||||
// change h1's label to z, container should be killed
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
||||
toSet("z")));
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.KILLED, 10 * 1000));
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||
|
||||
// check used resource:
|
||||
// queue-a used x=0G, ""=1G ("" not changed)
|
||||
// change h1's label to z
|
||||
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
|
||||
toSet("z"))));
|
||||
checkUsedResource(rm, "a", 0, "x");
|
||||
checkUsedResource(rm, "a", 1024, "z");
|
||||
checkUsedResource(rm, "a", 1024);
|
||||
checkUsedResource(rm, "root", 0, "x");
|
||||
checkUsedResource(rm, "root", 1024, "z");
|
||||
checkUsedResource(rm, "root", 1024);
|
||||
checkUserUsedResource(rm, "a", "user", "x", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "z", 1024);
|
||||
Assert.assertEquals(0,
|
||||
app.getAppAttemptResourceUsage().getUsed("x").getMemory());
|
||||
Assert.assertEquals(1024,
|
||||
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
|
||||
|
||||
// request a container with label = y
|
||||
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
|
||||
// change h1's label to y
|
||||
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
|
||||
toSet("y"))));
|
||||
checkUsedResource(rm, "a", 0, "x");
|
||||
checkUsedResource(rm, "a", 1024, "y");
|
||||
checkUsedResource(rm, "a", 0, "z");
|
||||
checkUsedResource(rm, "a", 1024);
|
||||
checkUsedResource(rm, "root", 0, "x");
|
||||
checkUsedResource(rm, "root", 1024, "y");
|
||||
checkUsedResource(rm, "root", 0, "z");
|
||||
checkUsedResource(rm, "root", 1024);
|
||||
checkUserUsedResource(rm, "a", "user", "x", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "y", 1024);
|
||||
checkUserUsedResource(rm, "a", "user", "z", 0);
|
||||
Assert.assertEquals(0,
|
||||
app.getAppAttemptResourceUsage().getUsed("x").getMemory());
|
||||
Assert.assertEquals(1024,
|
||||
app.getAppAttemptResourceUsage().getUsed("y").getMemory());
|
||||
Assert.assertEquals(0,
|
||||
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
|
||||
|
||||
// change h1's label to no label
|
||||
Set<String> emptyLabels = new HashSet<>();
|
||||
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
|
||||
emptyLabels);
|
||||
cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
|
||||
checkUsedResource(rm, "a", 0, "x");
|
||||
checkUsedResource(rm, "a", 0, "y");
|
||||
checkUsedResource(rm, "a", 0, "z");
|
||||
checkUsedResource(rm, "a", 2048);
|
||||
checkUsedResource(rm, "root", 0, "x");
|
||||
checkUsedResource(rm, "root", 0, "y");
|
||||
checkUsedResource(rm, "root", 0, "z");
|
||||
checkUsedResource(rm, "root", 2048);
|
||||
checkUserUsedResource(rm, "a", "user", "x", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "y", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "z", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "", 2048);
|
||||
Assert.assertEquals(0,
|
||||
app.getAppAttemptResourceUsage().getUsed("x").getMemory());
|
||||
Assert.assertEquals(0,
|
||||
app.getAppAttemptResourceUsage().getUsed("y").getMemory());
|
||||
Assert.assertEquals(0,
|
||||
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
|
||||
Assert.assertEquals(2048,
|
||||
app.getAppAttemptResourceUsage().getUsed("").getMemory());
|
||||
|
||||
// Finish the two containers, we should see used resource becomes 0
|
||||
cs.completedContainer(cs.getRMContainer(containerId2),
|
||||
ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL);
|
||||
cs.completedContainer(cs.getRMContainer(containerId1),
|
||||
ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL);
|
||||
|
||||
checkUsedResource(rm, "a", 0, "x");
|
||||
checkUsedResource(rm, "a", 0, "y");
|
||||
checkUsedResource(rm, "a", 0, "z");
|
||||
checkUsedResource(rm, "a", 0);
|
||||
checkUsedResource(rm, "root", 0, "x");
|
||||
checkUsedResource(rm, "root", 0, "y");
|
||||
checkUsedResource(rm, "root", 0, "z");
|
||||
checkUsedResource(rm, "root", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "x", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "y", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "z", 0);
|
||||
checkUserUsedResource(rm, "a", "user", "", 0);
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testComplexResourceUsageWhenNodeUpdatesPartition()
|
||||
throws Exception {
|
||||
/*
|
||||
* This test is similar to testResourceUsageWhenNodeUpdatesPartition, this
|
||||
* will include multiple applications, multiple users and multiple
|
||||
* containers running on a single node, size of each container is 1G
|
||||
*
|
||||
* Node 1
|
||||
* ------
|
||||
* App1-container3
|
||||
* App2-container2
|
||||
* App2-Container3
|
||||
*
|
||||
* Node 2
|
||||
* ------
|
||||
* App2-container1
|
||||
* App1-container1
|
||||
* App1-container2
|
||||
*/
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
|
||||
|
||||
// set mapping:
|
||||
// h1 -> x
|
||||
// h2 -> y
|
||||
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
rm.getRMContext().setNodeLabelManager(mgr);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 80000);
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 80000);
|
||||
|
||||
// app1
|
||||
RMApp app1 = rm.submitApp(GB, "app", "u1", null, "a");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
|
||||
|
||||
// c2 on n1, c3 on n2
|
||||
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
|
||||
ContainerId containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
|
||||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm2, containerId,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
|
||||
// check used resource:
|
||||
// queue-a used y=1G, ""=1G
|
||||
checkUsedResource(rm, "a", 1024, "y");
|
||||
checkUsedResource(rm, "a", 1024);
|
||||
|
||||
// change h2's label to no label, container should be killed
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET));
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.KILLED, 10 * 1000));
|
||||
|
||||
// check used resource:
|
||||
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
|
||||
checkUsedResource(rm, "a", 0, "x");
|
||||
checkUsedResource(rm, "a", 0, "y");
|
||||
checkUsedResource(rm, "a", 1024);
|
||||
// app2
|
||||
RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
|
||||
|
||||
// c2/c3 on n1
|
||||
am2.allocate("*", GB, 2, new ArrayList<ContainerId>(), "x");
|
||||
containerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
|
||||
// change h3's label to z, AM container should be killed
|
||||
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
|
||||
toSet("z")));
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId,
|
||||
RMContainerState.KILLED, 10 * 1000));
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
|
||||
// check used resource:
|
||||
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
|
||||
// queue-a used x=1G, ""=1G
|
||||
checkUsedResource(rm, "a", 3 * GB, "x");
|
||||
checkUsedResource(rm, "a", 3 * GB);
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
FiCaSchedulerApp application1 =
|
||||
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||
FiCaSchedulerApp application2 =
|
||||
cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
||||
|
||||
// change h1's label to z
|
||||
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
|
||||
toSet("z"))));
|
||||
checkUsedResource(rm, "a", 0, "x");
|
||||
checkUsedResource(rm, "a", 0, "y");
|
||||
checkUsedResource(rm, "a", 0);
|
||||
checkUsedResource(rm, "a", 3 * GB, "z");
|
||||
checkUsedResource(rm, "a", 3 * GB);
|
||||
checkUsedResource(rm, "root", 0, "x");
|
||||
checkUsedResource(rm, "root", 3 * GB, "z");
|
||||
checkUsedResource(rm, "root", 3 * GB);
|
||||
checkUserUsedResource(rm, "a", "u1", "x", 0 * GB);
|
||||
checkUserUsedResource(rm, "a", "u1", "z", 1 * GB);
|
||||
checkUserUsedResource(rm, "a", "u1", "", 2 * GB);
|
||||
checkUserUsedResource(rm, "a", "u2", "x", 0 * GB);
|
||||
checkUserUsedResource(rm, "a", "u2", "z", 2 * GB);
|
||||
checkUserUsedResource(rm, "a", "u2", "", 1 * GB);
|
||||
Assert.assertEquals(0,
|
||||
application1.getAppAttemptResourceUsage().getUsed("x").getMemory());
|
||||
Assert.assertEquals(1 * GB,
|
||||
application1.getAppAttemptResourceUsage().getUsed("z").getMemory());
|
||||
Assert.assertEquals(2 * GB,
|
||||
application1.getAppAttemptResourceUsage().getUsed("").getMemory());
|
||||
Assert.assertEquals(0,
|
||||
application2.getAppAttemptResourceUsage().getUsed("x").getMemory());
|
||||
Assert.assertEquals(2 * GB,
|
||||
application2.getAppAttemptResourceUsage().getUsed("z").getMemory());
|
||||
Assert.assertEquals(1 * GB,
|
||||
application2.getAppAttemptResourceUsage().getUsed("").getMemory());
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user