diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java index d78f8adf3f..2e24da6874 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java @@ -63,6 +63,18 @@ public static OpportunisticSchedulerMetrics getMetrics() { return INSTANCE; } + @VisibleForTesting + public static void resetMetrics() { + synchronized (OpportunisticSchedulerMetrics.class) { + isInitialized.set(false); + INSTANCE = null; + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.unregisterSource("OpportunisticSchedulerMetrics"); + } + } + } + private static void registerMetrics() { registry = new MetricsRegistry(RECORD_INFO); registry.tag(RECORD_INFO, "ResourceManager"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 7ce7bfc96d..8772ddb44e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -584,8 +584,14 @@ public void recoverContainersOnNode(List containerReports, rmContainer); // recover scheduler attempt - schedulerAttempt.recoverContainer(schedulerNode, rmContainer); + final boolean recovered = schedulerAttempt.recoverContainer( + schedulerNode, rmContainer); + if (recovered && rmContainer.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + OpportunisticSchedulerMetrics.getMetrics() + .incrAllocatedOppContainers(1); + } // set master container for the current running AMContainer for this // attempt. RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); @@ -720,7 +726,10 @@ public void completedContainer(RMContainer rmContainer, SchedulerApplicationAttempt schedulerAttempt = getCurrentAttemptForContainer(containerId); if (schedulerAttempt != null) { - schedulerAttempt.removeRMContainer(containerId); + if (schedulerAttempt.removeRMContainer(containerId)) { + OpportunisticSchedulerMetrics.getMetrics() + .incrReleasedOppContainers(1); + } } LOG.debug("Completed container: {} in state: {} event:{}", rmContainer.getContainerId(), rmContainer.getState(), event); @@ -729,7 +738,6 @@ public void completedContainer(RMContainer rmContainer, if (node != null) { node.releaseContainer(rmContainer.getContainerId(), false); } - OpportunisticSchedulerMetrics.getMetrics().incrReleasedOppContainers(1); } // If the container is getting killed in ACQUIRED state, the requester (AM @@ -1411,6 +1419,8 @@ private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt, RMContainer demotedRMContainer = createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); if (demotedRMContainer != null) { + OpportunisticSchedulerMetrics.getMetrics() + .incrAllocatedOppContainers(1); appAttempt.addToNewlyDemotedContainers( uReq.getContainerId(), demotedRMContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index a4f0750166..c50ee56738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -402,7 +402,13 @@ public void addRMContainer( } } - public void removeRMContainer(ContainerId containerId) { + /** + * Removes an RM container from the map of live containers + * related to this application attempt. + * @param containerId The container ID of the RMContainer to remove + * @return true if the container is in the map + */ + public boolean removeRMContainer(ContainerId containerId) { writeLock.lock(); try { RMContainer rmContainer = liveContainers.remove(containerId); @@ -415,7 +421,11 @@ public void removeRMContainer(ContainerId containerId) { this.attemptResourceUsageAllocatedRemotely .decUsed(rmContainer.getAllocatedResource()); } + + return true; } + + return false; } finally { writeLock.unlock(); } @@ -1226,7 +1236,7 @@ public void move(Queue newQueue) { } } - public void recoverContainer(SchedulerNode node, + public boolean recoverContainer(SchedulerNode node, RMContainer rmContainer) { writeLock.lock(); try { @@ -1234,7 +1244,7 @@ public void recoverContainer(SchedulerNode node, appSchedulingInfo.recoverContainer(rmContainer, node.getPartition()); if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { - return; + return false; } LOG.info("SchedulerAttempt " + getApplicationAttemptId() + " is recovering container " + rmContainer.getContainerId()); @@ -1244,6 +1254,8 @@ public void recoverContainer(SchedulerNode node, rmContainer.getContainer().getResource()); } + return true; + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource // is called. // newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f5a0040f26..3b7055c757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -666,11 +666,11 @@ private Container createContainer(FSSchedulerNode node, Resource capability, } @Override - public synchronized void recoverContainer(SchedulerNode node, + public synchronized boolean recoverContainer(SchedulerNode node, RMContainer rmContainer) { writeLock.lock(); try { - super.recoverContainer(node, rmContainer); + final boolean recovered = super.recoverContainer(node, rmContainer); if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { getQueue().incUsedResource(rmContainer.getContainer().getResource()); @@ -685,6 +685,8 @@ public synchronized void recoverContainer(SchedulerNode node, getQueue().addAMResourceUsage(resource); setAmRunning(true); } + + return recovered; } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 90c554361c..0aac2f1399 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -534,6 +534,17 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores, return nm; } + public MockNM registerNode(String nodeIdStr, int memory, int vCores, + List runningApplications, + List containerStatuses) throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), + YarnVersionInfo.getVersion()); + nm.registerNode(containerStatuses, runningApplications); + drainEventsImplicitly(); + return nm; + } + public MockNM registerNode(String nodeIdStr, Resource nodeCapability) throws Exception { MockNM nm = new MockNM(nodeIdStr, nodeCapability, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 901dc8a143..b99c456fb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -105,6 +107,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -132,6 +135,11 @@ public void createAndStartRM() { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setInt( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.set( + YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); startRM(conf); } @@ -165,6 +173,8 @@ public void stopRM() { if (rm != null) { rm.stop(); } + + OpportunisticSchedulerMetrics.resetMetrics(); } @Test(timeout = 600000) @@ -817,6 +827,130 @@ public void testOpportunisticSchedulerMetrics() throws Exception { metrics.getAggregatedReleasedContainers()); } + /** + * Tests that, if a node has running opportunistic containers when the RM + * is down, RM is able to reflect the opportunistic containers + * in its metrics upon RM recovery. + */ + @Test + public void testMetricsRetainsAllocatedOpportunisticAfterRMRestart() + throws Exception { + final MockRMAppSubmissionData appSubmissionData = + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("default") + .build(); + + MockNM nm1 = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); + nm1.registerNode(); + + final RMApp app = MockRMAppSubmitter.submit(rm, appSubmissionData); + + final ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + + MockRM.launchAndRegisterAM(app, rm, nm1); + + final OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + + // We start with ID 2, since AMContainer is ID 1 + final ContainerId recoverOContainerId2 = ContainerId.newContainerId( + appAttemptId, 2); + + final Resource fakeResource = Resource.newInstance(1024, 1); + final String fakeDiagnostics = "recover container"; + final Priority fakePriority = Priority.newInstance(0); + + final NMContainerStatus recoverOContainerReport1 = + NMContainerStatus.newInstance( + recoverOContainerId2, 0, ContainerState.RUNNING, + fakeResource, fakeDiagnostics, 0, + fakePriority, 0, null, + ExecutionType.OPPORTUNISTIC, -1); + + // Make sure that numbers start with 0 + Assert.assertEquals(0, metrics.getAllocatedContainers()); + + // Recover one OContainer only + rm.registerNode("h2:1234", 4096, 1, + Collections.singletonList( + appAttemptId.getApplicationId()), + Collections.singletonList(recoverOContainerReport1)); + + Assert.assertEquals(1, metrics.getAllocatedContainers()); + + // Recover two OContainers at once + final ContainerId recoverOContainerId3 = ContainerId.newContainerId( + appAttemptId, 3); + + final ContainerId recoverOContainerId4 = ContainerId.newContainerId( + appAttemptId, 4); + + final NMContainerStatus recoverOContainerReport2 = + NMContainerStatus.newInstance( + recoverOContainerId2, 0, ContainerState.RUNNING, + fakeResource, fakeDiagnostics, 0, + fakePriority, 0, null, + ExecutionType.OPPORTUNISTIC, -1); + + final NMContainerStatus recoverOContainerReport3 = + NMContainerStatus.newInstance( + recoverOContainerId3, 0, ContainerState.RUNNING, + fakeResource, fakeDiagnostics, 0, + fakePriority, 0, null, + ExecutionType.OPPORTUNISTIC, -1); + + rm.registerNode( + "h3:1234", 4096, 10, + Collections.singletonList( + appAttemptId.getApplicationId()), + Arrays.asList(recoverOContainerReport2, recoverOContainerReport3)); + + Assert.assertEquals(3, metrics.getAllocatedContainers()); + + // Make sure that the recovered GContainer + // does not increment OContainer count + final ContainerId recoverGContainerId = ContainerId.newContainerId( + appAttemptId, 5); + + final NMContainerStatus recoverGContainerReport = + NMContainerStatus.newInstance( + recoverGContainerId, 0, ContainerState.RUNNING, + fakeResource, fakeDiagnostics, 0, + fakePriority, 0, null, + ExecutionType.GUARANTEED, -1); + + rm.registerNode( + "h4:1234", 4096, 10, + Collections.singletonList( + appAttemptId.getApplicationId()), + Collections.singletonList(recoverGContainerReport)); + + Assert.assertEquals(3, metrics.getAllocatedContainers()); + + final ContainerId completedOContainerId = ContainerId.newContainerId( + appAttemptId, 6); + + final NMContainerStatus completedOContainerReport = + NMContainerStatus.newInstance( + completedOContainerId, 0, ContainerState.COMPLETE, + fakeResource, fakeDiagnostics, 0, + fakePriority, 0, null, + ExecutionType.OPPORTUNISTIC, -1); + + // Tests that completed containers are not recorded + rm.registerNode( + "h5:1234", 4096, 10, + Collections.singletonList( + appAttemptId.getApplicationId()), + Collections.singletonList(completedOContainerReport)); + + Assert.assertEquals(3, metrics.getAllocatedContainers()); + } + @Test(timeout = 60000) public void testAMCrashDuringAllocate() throws Exception { MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());