From 62e447610208919a00ecdf8eb99ad498689bbb05 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 17 Jun 2022 16:38:36 -0700 Subject: [PATCH] YARN-10122. Support signalToContainer API for Federation. (#4421) --- .../yarn/server/router/RouterMetrics.java | 33 ++++++++++++++ .../clientrm/FederationClientInterceptor.java | 38 +++++++++++++++- .../yarn/server/router/TestRouterMetrics.java | 33 ++++++++++++++ .../TestFederationClientInterceptor.java | 45 +++++++++++++++++++ .../TestableFederationClientInterceptor.java | 11 ++++- 5 files changed, 158 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index b02b3e155f..ac37c4ed1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -81,6 +81,8 @@ public final class RouterMetrics { private MutableGaugeInt numUpdateAppPriorityFailedRetrieved; @Metric("# of updateApplicationPriority failed to be retrieved") private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved; + @Metric("# of signalToContainer failed to be retrieved") + private MutableGaugeInt numSignalToContainerFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -126,6 +128,8 @@ public final class RouterMetrics { private MutableRate totalSucceededUpdateAppPriorityRetrieved; @Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)") private MutableRate totalSucceededUpdateAppTimeoutsRetrieved; + @Metric("Total number of successful Retrieved signalToContainer and latency(ms)") + private MutableRate totalSucceededSignalToContainerRetrieved; /** * Provide quantile counters for all latencies. @@ -150,6 +154,7 @@ public final class RouterMetrics { private MutableQuantiles failAppAttemptLatency; private MutableQuantiles updateAppPriorityLatency; private MutableQuantiles updateAppTimeoutsLatency; + private MutableQuantiles signalToContainerLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -228,6 +233,10 @@ private RouterMetrics() { updateAppTimeoutsLatency = registry.newQuantiles("updateApplicationTimeoutsLatency", "latency of update application timeouts", "ops", "latency", 10); + + signalToContainerLatency = + registry.newQuantiles("signalToContainerLatency", + "latency of signal to container timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -349,6 +358,11 @@ public long getNumSucceededUpdateAppTimeoutsRetrieved() { return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededSignalToContainerRetrieved() { + return totalSucceededSignalToContainerRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -449,6 +463,11 @@ public double getLatencySucceededUpdateAppTimeoutsRetrieved() { return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededSignalToContainerRetrieved() { + return totalSucceededSignalToContainerRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -549,6 +568,11 @@ public int getUpdateApplicationTimeoutsFailedRetrieved() { return numUpdateAppTimeoutsFailedRetrieved.value(); } + @VisibleForTesting + public int getSignalToContainerFailedRetrieved() { + return numSignalToContainerFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -649,6 +673,11 @@ public void succeededUpdateAppTimeoutsRetrieved(long duration) { updateAppTimeoutsLatency.add(duration); } + public void succeededSignalToContainerRetrieved(long duration) { + totalSucceededSignalToContainerRetrieved.add(duration); + signalToContainerLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -728,4 +757,8 @@ public void incrUpdateAppPriorityFailedRetrieved() { public void incrUpdateApplicationTimeoutsRetrieved() { numUpdateAppTimeoutsFailedRetrieved.incr(); } + + public void incrSignalToContainerFailedRetrieved() { + numSignalToContainerFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index fec62d4b08..6cc317242c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1304,7 +1304,43 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( @Override public SignalContainerResponse signalToContainer( SignalContainerRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getContainerId() == null + || request.getCommand() == null) { + routerMetrics.incrSignalToContainerFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing signalToContainer request or containerId " + + "or command information.", null); + } + + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = + request.getContainerId().getApplicationAttemptId().getApplicationId(); + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException ex) { + routerMetrics.incrSignalToContainerFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + applicationId + + " does not exist in FederationStateStore.", ex); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + SignalContainerResponse response = null; + try { + response = clientRMProxy.signalToContainer(request); + } catch (Exception ex) { + RouterServerUtil.logAndThrowException("Unable to signal to container for " + + applicationId + " from SubCluster " + subClusterId.getId(), ex); + } + + if (response == null) { + LOG.error("No response when signal to container of " + + "the applicationId {} to SubCluster {}.", applicationId, subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime); + return response; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 4b1049e8b6..eddd2a0ab4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -413,6 +413,11 @@ public void getUpdateApplicationTimeouts() { LOG.info("Mocked: failed updateApplicationTimeouts call"); metrics.incrUpdateApplicationTimeoutsRetrieved(); } + + public void getSignalContainer() { + LOG.info("Mocked: failed signalContainer call"); + metrics.incrSignalToContainerFailedRetrieved(); + } } // Records successes for all calls @@ -523,6 +528,11 @@ public void getUpdateApplicationTimeouts(long duration) { LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration); metrics.succeededUpdateAppTimeoutsRetrieved(duration); } + + public void getSignalToContainerTimeouts(long duration) { + LOG.info("Mocked: successful signalToContainer call with duration {}", duration); + metrics.succeededSignalToContainerRetrieved(duration); + } } @Test @@ -806,4 +816,27 @@ public void testUpdateAppTimeoutsFailed() { metrics.getUpdateApplicationTimeoutsFailedRetrieved()); } + @Test + public void testSucceededSignalToContainerRetrieved() { + long totalGoodBefore = metrics.getNumSucceededSignalToContainerRetrieved(); + goodSubCluster.getSignalToContainerTimeouts(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededSignalToContainerRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededSignalToContainerRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getSignalToContainerTimeouts(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededSignalToContainerRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededSignalToContainerRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testSignalToContainerFailed() { + long totalBadBefore = metrics.getSignalToContainerFailedRetrieved(); + badSubCluster.getSignalContainer(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getSignalToContainerFailedRetrieved()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 9ead9fbe72..3037738240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -91,6 +94,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -1056,4 +1060,45 @@ public void testUpdateApplicationTimeouts() throws Exception { Assert.assertNotNull(timeoutsResponse); Assert.assertEquals(appTimeout, responseTimeOut); } + + @Test + public void testSignalContainer() throws Exception { + LOG.info("Test FederationClientInterceptor : Signal Container request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer request " + + "or containerId or command information.", () -> interceptor.signalToContainer(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + MockNM nm = interceptor.getMockNMs().get(subClusterId); + nm.nodeHeartbeat(true); + mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); + mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); + + ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId(); + + SignalContainerRequest signalContainerRequest = + SignalContainerRequest.newInstance(containerId, SignalContainerCommand.GRACEFUL_SHUTDOWN); + SignalContainerResponse signalContainerResponse = + interceptor.signalToContainer(signalContainerRequest); + + Assert.assertNotNull(signalContainerResponse); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index 202a286696..af1f45924c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -51,6 +52,9 @@ public class TestableFederationClientInterceptor private ConcurrentHashMap mockRMs = new ConcurrentHashMap<>(); + private ConcurrentHashMap mockNMs = + new ConcurrentHashMap<>(); + private List badSubCluster = new ArrayList(); @Override @@ -71,7 +75,8 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( mockRM.init(super.getConf()); mockRM.start(); try { - mockRM.registerNode("h1:1234", 1024); + MockNM nm = mockRM.registerNode("127.0.0.1:1234", 8*1024, 4); + mockNMs.put(subClusterId, nm); } catch (Exception e) { Assert.fail(e.getMessage()); } @@ -118,4 +123,8 @@ protected void registerBadSubCluster(SubClusterId badSC) throws IOException { public ConcurrentHashMap getMockRMs() { return mockRMs; } + + public ConcurrentHashMap getMockNMs() { + return mockNMs; + } }