diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 8646381feb..ada2a0b1ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -118,4 +118,24 @@ public void recoverContainer(Resource clusterResource, * @return default application priority */ public Priority getDefaultApplicationPriority(); + + /** + * Increment Reserved Capacity + * + * @param partition + * asked by application + * @param reservedRes + * reserved resource asked + */ + public void incReservedResource(String partition, Resource reservedRes); + + /** + * Decrement Reserved Capacity + * + * @param partition + * asked by application + * @param reservedRes + * reserved resource asked + */ + public void decReservedResource(String partition, Resource reservedRes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 955f8faca6..6e715fb225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -534,7 +534,31 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, } return true; } - + + @Override + public void incReservedResource(String partition, Resource reservedRes) { + if (partition == null) { + partition = RMNodeLabelsManager.NO_LABEL; + } + + queueUsage.incReserved(partition, reservedRes); + if(null != parent){ + parent.incReservedResource(partition, reservedRes); + } + } + + @Override + public void decReservedResource(String partition, Resource reservedRes) { + if (partition == null) { + partition = RMNodeLabelsManager.NO_LABEL; + } + + queueUsage.decReserved(partition, reservedRes); + if(null != parent){ + parent.decReservedResource(partition, reservedRes); + } + } + @Override public void incPendingResource(String nodeLabel, Resource resourceToInc) { if (nodeLabel == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 2f981a7482..9cdcb72789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -186,6 +186,8 @@ private static void updateUsedCapacity(final ResourceCalculator rc, String nodePartition) { float absoluteUsedCapacity = 0.0f; float usedCapacity = 0.0f; + float reservedCapacity = 0.0f; + float absoluteReservedCapacity = 0.0f; if (Resources.greaterThan(rc, totalPartitionResource, totalPartitionResource, Resources.none())) { @@ -207,11 +209,22 @@ private static void updateUsedCapacity(final ResourceCalculator rc, usedCapacity = Resources.divide(rc, totalPartitionResource, usedResource, queueGuranteedResource); + + Resource resResource = queueResourceUsage.getReserved(nodePartition); + reservedCapacity = + Resources.divide(rc, totalPartitionResource, resResource, + queueGuranteedResource); + absoluteReservedCapacity = + Resources.divide(rc, totalPartitionResource, resResource, + totalPartitionResource); } queueCapacities .setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity); queueCapacities.setUsedCapacity(nodePartition, usedCapacity); + queueCapacities.setReservedCapacity(nodePartition, reservedCapacity); + queueCapacities + .setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity); } private static Resource getNonPartitionedMaxAvailableResourceToQueue( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3dc209082d..9a74c2288c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -963,6 +963,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, node.getPartition(), reservedOrAllocatedRMContainer, assignment.isIncreasedAllocation()); + // Update reserved metrics + Resource reservedRes = assignment.getAssignmentInformation() + .getReserved(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + incReservedResource(node.getPartition(), reservedRes); + } + // Done return assignment; } else if (assignment.getSkipped()) { @@ -1315,7 +1322,14 @@ public void completedContainer(Resource clusterResource, // Book-keeping if (removed) { - + + // track reserved resource for metrics, for normal container + // getReservedResource will be null. + Resource reservedRes = rmContainer.getReservedResource(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + decReservedResource(node.getPartition(), reservedRes); + } + // Inform the ordering policy orderingPolicy.containerReleased(application, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index f2c26327ae..cc4af3dfb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -50,7 +50,7 @@ public QueueCapacities(boolean isRoot) { // Usage enum here to make implement cleaner private enum CapacityType { USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5), - MAX_AM_PERC(6); + MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8); private int idx; @@ -76,6 +76,8 @@ public String toString() { sb.append("cap=" + capacitiesArr[4] + "%, "); sb.append("abs_cap=" + capacitiesArr[5] + "%}"); sb.append("max_am_perc=" + capacitiesArr[6] + "%}"); + sb.append("reserved_cap=" + capacitiesArr[7] + "%}"); + sb.append("abs_reserved_cap=" + capacitiesArr[8] + "%}"); return sb.toString(); } } @@ -234,6 +236,40 @@ public void setMaxAMResourcePercentage(float value) { _set(NL, CapacityType.MAX_AM_PERC, value); } + /* Reserved Capacity Getter and Setter */ + public float getReservedCapacity() { + return _get(NL, CapacityType.RESERVED_CAP); + } + + public float getReservedCapacity(String label) { + return _get(label, CapacityType.RESERVED_CAP); + } + + public void setReservedCapacity(float value) { + _set(NL, CapacityType.RESERVED_CAP, value); + } + + public void setReservedCapacity(String label, float value) { + _set(label, CapacityType.RESERVED_CAP, value); + } + + /* Absolute Reserved Capacity Getter and Setter */ + public float getAbsoluteReservedCapacity() { + return _get(NL, CapacityType.ABS_RESERVED_CAP); + } + + public float getAbsoluteReservedCapacity(String label) { + return _get(label, CapacityType.ABS_RESERVED_CAP); + } + + public void setAbsoluteReservedCapacity(float value) { + _set(NL, CapacityType.ABS_RESERVED_CAP, value); + } + + public void setAbsoluteReservedCapacity(String label, float value) { + _set(label, CapacityType.ABS_RESERVED_CAP, value); + } + /** * Clear configurable fields, like * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index f82411daed..a33084f80c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -340,6 +340,14 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) { public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + @Override + public void incReservedResource(String nodeLabel, Resource resourceToInc) { + } + + @Override + public void decReservedResource(String nodeLabel, Resource resourceToDec) { + } + @Override public Priority getDefaultApplicationPriority() { // TODO add implementation for FSParentQueue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index cf125011f9..fba4c1318d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -220,6 +220,18 @@ public Priority getDefaultApplicationPriority() { // TODO add implementation for FIFO scheduler return null; } + + @Override + public void incReservedResource(String partition, Resource reservedRes) { + // TODO add implementation for FIFO scheduler + + } + + @Override + public void decReservedResource(String partition, Resource reservedRes) { + // TODO add implementation for FIFO scheduler + + } }; public FifoScheduler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 1f22a062d6..84eba10961 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -367,7 +367,8 @@ public void testExcessReservationWillBeUnreserved() throws Exception { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + // Do node heartbeats 2 times // First time will allocate container for app1, second time will reserve // container for app2 @@ -393,7 +394,11 @@ public void testExcessReservationWillBeUnreserved() throws Exception { // Usage of queue = 4G + 2 * 1G + 4G (reserved) Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage() .getUsed().getMemory()); - + Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemory()); + Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved() + .getMemory()); + // Cancel asks of app2 and re-kick RM am2.allocate("*", 4 * GB, 0, new ArrayList()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -405,6 +410,10 @@ public void testExcessReservationWillBeUnreserved() throws Exception { Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage() .getUsed().getMemory()); + Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage() + .getReserved().getMemory()); + Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved() + .getMemory()); rm1.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 1ee201dfc9..dc74593eef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -462,7 +462,84 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } - + + @Test (timeout = 120000) + public void testContainerReservationWithLabels() throws Exception { + // This test is pretty much similar to testContainerAllocateWithLabel. + // Difference is, this test doesn't specify label expression in + // ResourceRequest, + // instead, it uses default queue label expression + + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", + "z")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("x"), NodeId.newInstance("h2", 0), toSet("y"), + NodeId.newInstance("h3", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM( + TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + rm1.registerNode("h2:1234", 8 * GB); // label = y + rm1.registerNode("h3:1234", 8 * GB); // label = x + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 4 * GB, 2, new ArrayList()); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + + // Do node heartbeats 2 times + // First time will allocate container for app1, second time will reserve + // container for app1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // Check if a 4G container allocated for app1, and 4G is reserved + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertTrue(schedulerApp1.getReservedContainers().size() > 0); + Assert.assertEquals(9 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemory()); + Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemory()); + Assert.assertEquals(4 * GB, + leafQueue.getQueueResourceUsage().getReserved("x").getMemory()); + + // Cancel asks of app2 and re-kick RM + am1.allocate("*", 4 * GB, 0, new ArrayList()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage() + .getUsed("x").getMemory()); + Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage() + .getReserved("x").getMemory()); + Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved("x") + .getMemory()); + rm1.close(); + } + private void checkPendingResource(MockRM rm, int priority, ApplicationAttemptId attemptId, int memory) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java index 9d2fa15d4e..356ed46467 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java @@ -44,7 +44,9 @@ public static Collection getParameters() { { "AbsoluteUsedCapacity" }, { "MaximumCapacity" }, { "AbsoluteMaximumCapacity" }, - { "MaxAMResourcePercentage" } }); + { "MaxAMResourcePercentage" }, + { "ReservedCapacity" }, + { "AbsoluteReservedCapacity" }}); } public TestQueueCapacities(String suffix) {