diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index b5ae4f5b3c..26d35ac897 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -99,6 +99,11 @@ public class RMNodeWrapper implements RMNode { return node.getTotalCapability(); } + @Override + public Resource getAllocatedContainerResource() { + return node.getAllocatedContainerResource(); + } + @Override public String getRackName() { return node.getRackName(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index e6205d2dac..5d60b4fbe0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Node managers information on available resources @@ -104,6 +105,17 @@ public interface RMNode { */ public Resource getTotalCapability(); + /** + * The total allocated resources to containers. + * This will include the sum of Guaranteed and Opportunistic + * containers queued + running + paused on the node. + * @return the total allocated resources, including all Guaranteed and + * Opportunistic containers in queued, running and paused states. + */ + default Resource getAllocatedContainerResource() { + return Resources.none(); + } + /** * If the total available resources has been updated. * @return If the capability has been updated. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e5f6f2c85b..b8aaea5de3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler { /* Snapshot of total resources before receiving decommissioning command */ private volatile Resource originalTotalCapability; private volatile Resource totalCapability; + private volatile Resource allocatedContainerResource = + Resource.newInstance(Resources.none()); private volatile boolean updatedCapability = false; private final Node node; @@ -464,6 +466,11 @@ public class RMNodeImpl implements RMNode, EventHandler { return this.totalCapability; } + @Override + public Resource getAllocatedContainerResource() { + return this.allocatedContainerResource; + } + @Override public boolean isUpdatedCapability() { return this.updatedCapability; @@ -952,13 +959,22 @@ public class RMNodeImpl implements RMNode, EventHandler { ClusterMetrics.getMetrics().decrDecommisionedNMs(); } containers = startEvent.getNMContainerStatuses(); + final Resource allocatedResource = Resource.newInstance( + Resources.none()); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { - if (container.getContainerState() == ContainerState.RUNNING) { - rmNode.launchedContainers.add(container.getContainerId()); + if (container.getContainerState() == ContainerState.NEW || + container.getContainerState() == ContainerState.RUNNING) { + Resources.addTo(allocatedResource, + container.getAllocatedResource()); + if (container.getContainerState() == ContainerState.RUNNING) { + rmNode.launchedContainers.add(container.getContainerId()); + } } } } + + rmNode.allocatedContainerResource = allocatedResource; } if (null != startEvent.getRunningApplications()) { @@ -1554,6 +1570,8 @@ public class RMNodeImpl implements RMNode, EventHandler { List> needUpdateContainers = new ArrayList>(); int numRemoteRunningContainers = 0; + final Resource allocatedResource = Resource.newInstance(Resources.none()); + for (ContainerStatus remoteContainer : containerStatuses) { ContainerId containerId = remoteContainer.getContainerId(); @@ -1622,8 +1640,16 @@ public class RMNodeImpl implements RMNode, EventHandler { containerAllocationExpirer .unregister(new AllocationExpirationInfo(containerId)); } + + if ((remoteContainer.getState() == ContainerState.RUNNING || + remoteContainer.getState() == ContainerState.NEW) && + remoteContainer.getCapability() != null) { + Resources.addTo(allocatedResource, remoteContainer.getCapability()); + } } + allocatedContainerResource = allocatedResource; + List lostContainers = findLostContainers(numRemoteRunningContainers, containerStatuses); for (ContainerStatus remoteContainer : lostContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3346b57d98..db14d422f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -43,13 +43,17 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -79,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -231,6 +236,32 @@ public class TestRMNodeTransitions { return event; } + private static ContainerStatus getMockContainerStatus( + final ContainerId containerId, final Resource capability, + final ContainerState containerState) { + return getMockContainerStatus(containerId, capability, containerState, + ExecutionType.GUARANTEED); + } + + private static ContainerStatus getMockContainerStatus( + final ContainerId containerId, final Resource capability, + final ContainerState containerState, final ExecutionType executionType) { + final ContainerStatus containerStatus = mock(ContainerStatus.class); + doReturn(containerId).when(containerStatus).getContainerId(); + doReturn(containerState).when(containerStatus).getState(); + doReturn(capability).when(containerStatus).getCapability(); + doReturn(executionType).when(containerStatus).getExecutionType(); + return containerStatus; + } + + private static NMContainerStatus createNMContainerStatus( + final ContainerId containerId, final ExecutionType executionType, + final ContainerState containerState, final Resource capability) { + return NMContainerStatus.newInstance(containerId, 0, containerState, + capability, "", 0, Priority.newInstance(0), 0, + CommonNodeLabelsManager.NO_LABEL, executionType, -1); + } + @Test (timeout = 5000) public void testExpiredContainer() { NodeStatus mockNodeStatus = createMockNodeStatus(); @@ -248,8 +279,8 @@ public class TestRMNodeTransitions { // Now verify that scheduler isn't notified of an expired container // by checking number of 'completedContainers' it got in the previous event RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatus = mock(ContainerStatus.class); - doReturn(completedContainerId).when(containerStatus).getContainerId(); + ContainerStatus containerStatus = getMockContainerStatus( + completedContainerId, null, ContainerState.COMPLETE); doReturn(Collections.singletonList(containerStatus)). when(statusEvent).getContainers(); node.handle(statusEvent); @@ -321,12 +352,13 @@ public class TestRMNodeTransitions { RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class); - ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class); - ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class); + ContainerStatus containerStatusFromNode1 = getMockContainerStatus( + completedContainerIdFromNode1, null, ContainerState.COMPLETE); + ContainerStatus containerStatusFromNode2_1 = getMockContainerStatus( + completedContainerIdFromNode2_1, null, ContainerState.COMPLETE); + ContainerStatus containerStatusFromNode2_2 = getMockContainerStatus( + completedContainerIdFromNode2_2, null, ContainerState.COMPLETE); - doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1) - .getContainerId(); doReturn(Collections.singletonList(containerStatusFromNode1)) .when(statusEventFromNode1).getContainers(); node.handle(statusEventFromNode1); @@ -336,13 +368,9 @@ public class TestRMNodeTransitions { completedContainers.clear(); - doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1) - .getContainerId(); doReturn(Collections.singletonList(containerStatusFromNode2_1)) .when(statusEventFromNode2_1).getContainers(); - doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2) - .getContainerId(); doReturn(Collections.singletonList(containerStatusFromNode2_2)) .when(statusEventFromNode2_2).getContainers(); @@ -358,6 +386,181 @@ public class TestRMNodeTransitions { .getContainerId()); } + /** + * Tests that allocated resources are counted correctly on new nodes + * that are added to the cluster. + */ + @Test + public void testAddWithAllocatedContainers() { + NodeStatus mockNodeStatus = createMockNodeStatus(); + RMNodeImpl node = getNewNode(); + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + + // Independently computed expected allocated resource to verify against + final Resource expectedResource = Resource.newInstance(Resources.none()); + + // Guaranteed containers + final ContainerId newContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 0); + final Resource newContainerCapability = + Resource.newInstance(100, 1); + Resources.addTo(expectedResource, newContainerCapability); + final NMContainerStatus newContainerStatus = createNMContainerStatus( + newContainerId, ExecutionType.GUARANTEED, + ContainerState.NEW, newContainerCapability); + + final ContainerId runningContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 1); + final Resource runningContainerCapability = + Resource.newInstance(200, 2); + Resources.addTo(expectedResource, runningContainerCapability); + final NMContainerStatus runningContainerStatus = createNMContainerStatus( + runningContainerId, ExecutionType.GUARANTEED, + ContainerState.RUNNING, runningContainerCapability); + + // Opportunistic containers + final ContainerId newOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 2); + final Resource newOppContainerCapability = + Resource.newInstance(300, 3); + Resources.addTo(expectedResource, newOppContainerCapability); + final NMContainerStatus newOppContainerStatus = createNMContainerStatus( + newOppContainerId, ExecutionType.OPPORTUNISTIC, + ContainerState.NEW, newOppContainerCapability); + + final ContainerId runningOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 3); + final Resource runningOppContainerCapability = + Resource.newInstance(400, 4); + Resources.addTo(expectedResource, runningOppContainerCapability); + final NMContainerStatus runningOppContainerStatus = createNMContainerStatus( + runningOppContainerId, ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, runningOppContainerCapability); + + node.handle(new RMNodeStartedEvent(node.getNodeID(), + Arrays.asList(newContainerStatus, runningContainerStatus, + newOppContainerStatus, runningOppContainerStatus), + null, mockNodeStatus)); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + Assert.assertEquals(expectedResource, node.getAllocatedContainerResource()); + } + + /** + * Tests that allocated container resources are counted correctly in + * {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode} + * upon a node update. Resources should be counted for both GUARANTEED + * and OPPORTUNISTIC containers. + */ + @Test (timeout = 5000) + public void testAllocatedContainerUpdate() { + NodeStatus mockNodeStatus = createMockNodeStatus(); + //Start the node + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); + + // Make sure that the node starts with no allocated resources + Assert.assertEquals(Resources.none(), node.getAllocatedContainerResource()); + + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + final ContainerId newContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 0); + final ContainerId runningContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 1); + + rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); + + RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); + + final List containerStatuses = new ArrayList<>(); + + // Use different memory and VCores for new and running state containers + // to test that they add up correctly + final Resource newContainerCapability = + Resource.newInstance(100, 1); + final Resource runningContainerCapability = + Resource.newInstance(200, 2); + final Resource completedContainerCapability = + Resource.newInstance(50, 3); + final ContainerStatus newContainerStatusFromNode = getMockContainerStatus( + newContainerId, newContainerCapability, ContainerState.NEW); + final ContainerStatus runningContainerStatusFromNode = + getMockContainerStatus(runningContainerId, runningContainerCapability, + ContainerState.RUNNING); + + containerStatuses.addAll(Arrays.asList( + newContainerStatusFromNode, runningContainerStatusFromNode)); + doReturn(containerStatuses).when(statusEventFromNode1).getContainers(); + node.handle(statusEventFromNode1); + Assert.assertEquals(Resource.newInstance(300, 3), + node.getAllocatedContainerResource()); + + final ContainerId newOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 2); + final ContainerId runningOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 3); + + // Use the same resource capability as in previous for opportunistic case + RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null); + final ContainerStatus newOppContainerStatusFromNode = + getMockContainerStatus(newOppContainerId, newContainerCapability, + ContainerState.NEW, ExecutionType.OPPORTUNISTIC); + final ContainerStatus runningOppContainerStatusFromNode = + getMockContainerStatus(runningOppContainerId, + runningContainerCapability, ContainerState.RUNNING, + ExecutionType.OPPORTUNISTIC); + + containerStatuses.addAll(Arrays.asList( + newOppContainerStatusFromNode, runningOppContainerStatusFromNode)); + + // Pass in both guaranteed and opportunistic container statuses + doReturn(containerStatuses).when(statusEventFromNode2).getContainers(); + + node.handle(statusEventFromNode2); + + // The result here should be double the first check, + // since allocated resources are doubled, just + // with different execution types + Assert.assertEquals(Resource.newInstance(600, 6), + node.getAllocatedContainerResource()); + + RMNodeStatusEvent statusEventFromNode3 = getMockRMNodeStatusEvent(null); + final ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 4); + final ContainerId completedOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 5); + final ContainerStatus completedContainerStatusFromNode = + getMockContainerStatus(completedContainerId, completedContainerCapability, + ContainerState.COMPLETE, ExecutionType.OPPORTUNISTIC); + final ContainerStatus completedOppContainerStatusFromNode = + getMockContainerStatus(completedOppContainerId, + completedContainerCapability, ContainerState.COMPLETE, + ExecutionType.OPPORTUNISTIC); + + containerStatuses.addAll(Arrays.asList( + completedContainerStatusFromNode, completedOppContainerStatusFromNode)); + + doReturn(containerStatuses).when(statusEventFromNode3).getContainers(); + node.handle(statusEventFromNode3); + + // Adding completed containers should not have changed + // the resources allocated + Assert.assertEquals(Resource.newInstance(600, 6), + node.getAllocatedContainerResource()); + + RMNodeStatusEvent emptyStatusEventFromNode = + getMockRMNodeStatusEvent(null); + + doReturn(Collections.emptyList()) + .when(emptyStatusEventFromNode).getContainers(); + node.handle(emptyStatusEventFromNode); + + // Passing an empty containers list should yield no resources allocated + Assert.assertEquals(Resources.none(), + node.getAllocatedContainerResource()); + } + @Test (timeout = 5000) public void testStatusChange(){ NodeStatus mockNodeStatus = createMockNodeStatus(); @@ -376,14 +579,14 @@ public class TestRMNodeTransitions { RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatus1 = mock(ContainerStatus.class); - ContainerStatus containerStatus2 = mock(ContainerStatus.class); + ContainerStatus containerStatus1 = getMockContainerStatus( + completedContainerId1, null, null); + ContainerStatus containerStatus2 = getMockContainerStatus( + completedContainerId2, null, null); - doReturn(completedContainerId1).when(containerStatus1).getContainerId(); doReturn(Collections.singletonList(containerStatus1)) .when(statusEvent1).getContainers(); - doReturn(completedContainerId2).when(containerStatus2).getContainerId(); doReturn(Collections.singletonList(containerStatus2)) .when(statusEvent2).getContainers(); @@ -1153,9 +1356,9 @@ public class TestRMNodeTransitions { RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatus1 = mock(ContainerStatus.class); + ContainerStatus containerStatus1 = getMockContainerStatus( + completedContainerId1, null, ContainerState.COMPLETE); - doReturn(completedContainerId1).when(containerStatus1).getContainerId(); doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) .getContainers();