From 1f53ae79728065417c6a99eb6fcc8d3a080ab4cc Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Fri, 8 Sep 2017 09:24:05 -0700 Subject: [PATCH] YARN-6849. NMContainerStatus should have the Container ExecutionType. (Kartheek Muthyala via asuresh) --- .../protocolrecords/NMContainerStatus.java | 17 ++- .../impl/pb/NMContainerStatusPBImpl.java | 30 +++++ .../yarn_server_common_service_protos.proto | 1 + .../container/ContainerImpl.java | 3 +- .../resourcemanager/rmnode/RMNodeImpl.java | 3 +- .../scheduler/AbstractYarnScheduler.java | 1 + .../scheduler/AppSchedulingInfo.java | 4 + .../SchedulerApplicationAttempt.java | 8 +- .../scheduler/capacity/LeafQueue.java | 4 +- .../scheduler/capacity/ParentQueue.java | 4 + .../server/resourcemanager/TestRMRestart.java | 4 +- .../TestResourceTrackerService.java | 108 ++++++++++++++++++ 12 files changed, 178 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java index ed950ce928..180add8061 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -40,13 +41,14 @@ public static NMContainerStatus newInstance(ContainerId containerId, long creationTime) { return newInstance(containerId, version, containerState, allocatedResource, diagnostics, containerExitStatus, priority, creationTime, - CommonNodeLabelsManager.NO_LABEL); + CommonNodeLabelsManager.NO_LABEL, ExecutionType.GUARANTEED); } public static NMContainerStatus newInstance(ContainerId containerId, int version, ContainerState containerState, Resource allocatedResource, String diagnostics, int containerExitStatus, Priority priority, - long creationTime, String nodeLabelExpression) { + long creationTime, String nodeLabelExpression, + ExecutionType executionType) { NMContainerStatus status = Records.newRecord(NMContainerStatus.class); status.setContainerId(containerId); @@ -58,6 +60,7 @@ public static NMContainerStatus newInstance(ContainerId containerId, status.setPriority(priority); status.setCreationTime(creationTime); status.setNodeLabelExpression(nodeLabelExpression); + status.setExecutionType(executionType); return status; } @@ -134,4 +137,14 @@ public int getVersion() { public void setVersion(int version) { } + + /** + * Get the ExecutionType of the container. + * @return ExecutionType of the container + */ + public ExecutionType getExecutionType() { + return ExecutionType.GUARANTEED; + } + + public void setExecutionType(ExecutionType executionType) { } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java index 2380391e0e..38df5f6766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -27,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; @@ -249,6 +251,25 @@ public void setNodeLabelExpression(String nodeLabelExpression) { builder.setNodeLabelExpression(nodeLabelExpression); } + @Override + public synchronized ExecutionType getExecutionType() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasExecutionType()) { + return ExecutionType.GUARANTEED; + } + return convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public synchronized void setExecutionType(ExecutionType executionType) { + maybeInitBuilder(); + if (executionType == null) { + builder.clearExecutionType(); + return; + } + builder.setExecutionType(convertToProtoFormat(executionType)); + } + private void mergeLocalToBuilder() { if (this.containerId != null && !((ContainerIdPBImpl) containerId).getProto().equals( @@ -313,4 +334,13 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + + private ExecutionType convertFromProtoFormat( + YarnProtos.ExecutionTypeProto e) { + return ProtoUtils.convertFromProtoFormat(e); + } + + private YarnProtos.ExecutionTypeProto convertToProtoFormat(ExecutionType e) { + return ProtoUtils.convertToProtoFormat(e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c2ba677226..e889cdec82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -174,6 +174,7 @@ message NMContainerStatusProto { optional int64 creation_time = 7; optional string nodeLabelExpression = 8; optional int32 version = 9; + optional ExecutionTypeProto executionType = 10 [default = GUARANTEED]; } message SCMUploaderNotifyRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index a768d18bb6..1a48b12d36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -632,7 +632,8 @@ public NMContainerStatus getNMContainerStatus() { getCurrentState(), getResource(), diagnostics.toString(), exitCode, containerTokenIdentifier.getPriority(), containerTokenIdentifier.getCreationTime(), - containerTokenIdentifier.getNodeLabelExpression()); + containerTokenIdentifier.getNodeLabelExpression(), + containerTokenIdentifier.getExecutionType()); } finally { this.readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1bdaa98b16..d270aa3084 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -847,7 +847,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { containers = startEvent.getNMContainerStatuses(); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { - if (container.getContainerState() == ContainerState.RUNNING) { + if (container.getContainerState() == ContainerState.RUNNING || + container.getContainerState() == ContainerState.SCHEDULED) { rmNode.launchedContainers.add(container.getContainerId()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0c07b3ea7a..fab02a24a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -529,6 +529,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, node.getHttpAddress(), status.getAllocatedResource(), status.getPriority(), null); container.setVersion(status.getVersion()); + container.setExecutionType(status.getExecutionType()); ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = new RMContainerImpl(container, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8acf7d55db..082ec14d43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -525,6 +526,9 @@ public void transferStateFromPreviousAppSchedulingInfo( } public void recoverContainer(RMContainer rmContainer, String partition) { + if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { + return; + } try { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index f9a72191a6..6a44cae6c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1131,9 +1131,11 @@ public void recoverContainer(SchedulerNode node, } LOG.info("SchedulerAttempt " + getApplicationAttemptId() + " is recovering container " + rmContainer.getContainerId()); - liveContainers.put(rmContainer.getContainerId(), rmContainer); - attemptResourceUsage.incUsed(node.getPartition(), - rmContainer.getContainer().getResource()); + addRMContainer(rmContainer.getContainerId(), rmContainer); + if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { + attemptResourceUsage.incUsed(node.getPartition(), + rmContainer.getContainer().getResource()); + } // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource // is called. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ffe0c0ff89..f24e30aa1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1802,7 +1802,9 @@ public void recoverContainer(Resource clusterResource, if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - + if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { + return; + } // Careful! Locking order is important! try { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 2e48000c09..6800b74f8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -863,6 +864,9 @@ public void recoverContainer(Resource clusterResource, if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } + if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { + return; + } // Careful! Locking order is important! try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 5cbcdbcb80..c9dcaef3f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -2097,7 +2098,8 @@ public static NMContainerStatus createNMContainerStatus( NMContainerStatus containerReport = NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(1024, 1), "recover container", 0, - Priority.newInstance(0), 0, nodeLabelExpression); + Priority.newInstance(0), 0, nodeLabelExpression, + ExecutionType.GUARANTEED); return containerReport; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 5ed327868c..41078d092c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -29,8 +29,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,15 +47,19 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -74,10 +81,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; @@ -2026,6 +2037,103 @@ public void tearDown() { } } + @SuppressWarnings("unchecked") + @Test + public void testHandleOpportunisticContainerStatus() throws Exception{ + final DrainDispatcher dispatcher = new DrainDispatcher(); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + rm = new MockRM(conf){ + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + rm.start(); + RMApp app = rm.submitApp(1024, true); + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + SchedulerApplicationAttempt applicationAttempt = null; + while (applicationAttempt == null) { + applicationAttempt = + ((AbstractYarnScheduler)rm.getRMContext().getScheduler()) + .getApplicationAttempt(appAttemptId); + Thread.sleep(100); + } + + Resource currentConsumption = applicationAttempt.getCurrentConsumption(); + Assert.assertEquals(Resource.newInstance(0, 0), currentConsumption); + Resource allocResources = + applicationAttempt.getQueue().getMetrics().getAllocatedResources(); + Assert.assertEquals(Resource.newInstance(0, 0), allocResources); + + RegisterNodeManagerRequest req = Records.newRecord( + RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); + NMContainerStatus queuedOpp = + NMContainerStatus.newInstance(c1, 1, ContainerState.SCHEDULED, + Resource.newInstance(1024, 1), "Dummy Queued OC", + ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "", + ExecutionType.OPPORTUNISTIC); + NMContainerStatus runningOpp = + NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING, + Resource.newInstance(2048, 1), "Dummy Running OC", + ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "", + ExecutionType.OPPORTUNISTIC); + NMContainerStatus runningGuar = + NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING, + Resource.newInstance(2048, 1), "Dummy Running GC", + ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "", + ExecutionType.GUARANTEED); + req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar)); + // trying to register a invalid node. + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + dispatcher.await(); + Thread.sleep(2000); + dispatcher.await(); + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + + Collection liveContainers = applicationAttempt + .getLiveContainers(); + Assert.assertEquals(3, liveContainers.size()); + Iterator iter = liveContainers.iterator(); + while (iter.hasNext()) { + RMContainer rc = iter.next(); + Assert.assertEquals( + rc.getContainerId().equals(c3) ? + ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC, + rc.getExecutionType()); + } + + // Should only include GUARANTEED resources + currentConsumption = applicationAttempt.getCurrentConsumption(); + Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption); + allocResources = + applicationAttempt.getQueue().getMetrics().getAllocatedResources(); + Assert.assertEquals(Resource.newInstance(2048, 1), allocResources); + + SchedulerNode schedulerNode = + rm.getRMContext().getScheduler().getSchedulerNode(nodeId); + Assert.assertNotNull(schedulerNode); + Resource nodeResources = schedulerNode.getAllocatedResource(); + Assert.assertEquals(Resource.newInstance(2048, 1), nodeResources); + } + @Test(timeout = 60000) public void testNodeHeartBeatResponseForUnknownContainerCleanUp() throws Exception {