YARN-4934. Reserved Resource for QueueMetrics needs to be handled correctly in few cases. (Sunil G via wangda)
This commit is contained in:
parent
e6c0742012
commit
fdc46bfb37
@ -1348,13 +1348,6 @@ public void completedContainer(Resource clusterResource,
|
|||||||
// Book-keeping
|
// Book-keeping
|
||||||
if (removed) {
|
if (removed) {
|
||||||
|
|
||||||
// track reserved resource for metrics, for normal container
|
|
||||||
// getReservedResource will be null.
|
|
||||||
Resource reservedRes = rmContainer.getReservedResource();
|
|
||||||
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
|
|
||||||
decReservedResource(node.getPartition(), reservedRes);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inform the ordering policy
|
// Inform the ordering policy
|
||||||
orderingPolicy.containerReleased(application, rmContainer);
|
orderingPolicy.containerReleased(application, rmContainer);
|
||||||
|
|
||||||
|
@ -246,6 +246,8 @@ public synchronized boolean unreserve(Priority priority,
|
|||||||
// Update reserved metrics
|
// Update reserved metrics
|
||||||
queue.getMetrics().unreserveResource(getUser(),
|
queue.getMetrics().unreserveResource(getUser(),
|
||||||
rmContainer.getReservedResource());
|
rmContainer.getReservedResource());
|
||||||
|
queue.decReservedResource(node.getPartition(),
|
||||||
|
rmContainer.getReservedResource());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
@ -37,6 +38,8 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
@ -50,8 +53,13 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
@ -418,4 +426,182 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
|
|||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAllocationForReservedContainer() throws Exception {
|
||||||
|
/**
|
||||||
|
* Test case: Submit two application (app1/app2) to a queue. And there's one
|
||||||
|
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
|
||||||
|
* app2 asks for a 4G container. App2's request will be reserved on the
|
||||||
|
* node.
|
||||||
|
*
|
||||||
|
* Before next node heartbeat, app1 container is completed/killed. So app1
|
||||||
|
* container which was reserved will be allocated.
|
||||||
|
*/
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm1 = new MockRM();
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
|
||||||
|
|
||||||
|
// launch an app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// launch another app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||||
|
|
||||||
|
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
||||||
|
|
||||||
|
// Do node heartbeats 2 times
|
||||||
|
// First time will allocate container for app1, second time will reserve
|
||||||
|
// container for app2
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
// App2 will get preference to be allocated on node1, and node1 will be all
|
||||||
|
// used by App2.
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
FiCaSchedulerApp schedulerApp2 =
|
||||||
|
cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// Check if a 4G container allocated for app1, and nothing allocated for app2
|
||||||
|
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
||||||
|
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
|
||||||
|
|
||||||
|
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
|
||||||
|
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
|
||||||
|
.getUnallocatedResource().getMemory());
|
||||||
|
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
|
||||||
|
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
|
||||||
|
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getUsed().getMemory());
|
||||||
|
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getReserved().getMemory());
|
||||||
|
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
|
||||||
|
.getMemory());
|
||||||
|
|
||||||
|
// Mark one app1 container as killed/completed and re-kick RM
|
||||||
|
for (RMContainer container : schedulerApp1.getLiveContainers()) {
|
||||||
|
if (container.isAMContainer()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
cs.markContainerForKillable(container);
|
||||||
|
}
|
||||||
|
// Cancel asks of app1 and re-kick RM
|
||||||
|
am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
// Check 4G container cancelled for app1, and one container allocated for
|
||||||
|
// app2
|
||||||
|
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
||||||
|
Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
|
||||||
|
|
||||||
|
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
|
||||||
|
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
|
||||||
|
.getUnallocatedResource().getMemory());
|
||||||
|
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
|
||||||
|
// Usage of queue = 4G + 2 * 1G
|
||||||
|
Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getUsed().getMemory());
|
||||||
|
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getReserved().getMemory());
|
||||||
|
Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
|
||||||
|
.getMemory());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
|
||||||
|
/**
|
||||||
|
* Test case: Submit two application (app1/app2) to a queue. And there's one
|
||||||
|
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
|
||||||
|
* app2 asks for a 4G container. App2's request will be reserved on the
|
||||||
|
* node.
|
||||||
|
*
|
||||||
|
* Before next node heartbeat, app1 container is completed/killed. So app1
|
||||||
|
* container which was reserved will be allocated.
|
||||||
|
*/
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm1 = new MockRM();
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
|
||||||
|
|
||||||
|
// launch an app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// launch another app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||||
|
|
||||||
|
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
||||||
|
|
||||||
|
// Do node heartbeats 2 times
|
||||||
|
// First time will allocate container for app1, second time will reserve
|
||||||
|
// container for app2
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
// App2 will get preference to be allocated on node1, and node1 will be all
|
||||||
|
// used by App2.
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
FiCaSchedulerApp schedulerApp2 =
|
||||||
|
cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// Check if a 4G container allocated for app1, and nothing allocated for app2
|
||||||
|
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
||||||
|
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
|
||||||
|
|
||||||
|
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
|
||||||
|
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
|
||||||
|
.getUnallocatedResource().getMemory());
|
||||||
|
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
|
||||||
|
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
|
||||||
|
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getUsed().getMemory());
|
||||||
|
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getReserved().getMemory());
|
||||||
|
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
|
||||||
|
.getMemory());
|
||||||
|
|
||||||
|
// Remove the node
|
||||||
|
cs.handle(new NodeRemovedSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
// Check all container cancelled for app1 and app2
|
||||||
|
Assert.assertEquals(0, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(0, schedulerApp2.getLiveContainers().size());
|
||||||
|
Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
|
||||||
|
|
||||||
|
// Usage and Reserved capacity of queue is 0
|
||||||
|
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getUsed().getMemory());
|
||||||
|
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
||||||
|
.getReserved().getMemory());
|
||||||
|
Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
|
||||||
|
.getMemory());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user