diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index d1e86de5da..1b72e4065d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -24,13 +24,13 @@ import java.util.Set; import java.util.HashMap; import java.util.Collections; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -76,6 +76,10 @@ public class UnmanagedAMPoolManager extends AbstractService { private ExecutorService threadpool; + private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread"; + + private Thread finishApplicationThread; + public UnmanagedAMPoolManager(ExecutorService threadpool) { super(UnmanagedAMPoolManager.class.getName()); this.threadpool = threadpool; @@ -96,48 +100,16 @@ protected void serviceStart() throws Exception { * UAMs running, force kill all of them. Do parallel kill because of * performance reasons. * - * TODO: move waiting for the kill to finish into a separate thread, without - * blocking the serviceStop. */ @Override protected void serviceStop() throws Exception { - ExecutorCompletionService completionService = - new ExecutorCompletionService<>(this.threadpool); - if (this.unmanagedAppMasterMap.isEmpty()) { - return; + + if (!this.unmanagedAppMasterMap.isEmpty()) { + finishApplicationThread = new Thread(createForceFinishApplicationThread()); + finishApplicationThread.setName(dispatcherThreadName); + finishApplicationThread.start(); } - // Save a local copy of the key set so that it won't change with the map - Set addressList = - new HashSet<>(this.unmanagedAppMasterMap.keySet()); - LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", - addressList.size()); - - for (final String uamId : addressList) { - completionService.submit(new Callable() { - @Override - public KillApplicationResponse call() throws Exception { - try { - LOG.info("Force-killing UAM id " + uamId + " for application " - + appIdMap.get(uamId)); - return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); - } catch (Exception e) { - LOG.error("Failed to kill unmanaged application master", e); - return null; - } - } - }); - } - - for (int i = 0; i < addressList.size(); ++i) { - try { - Future future = completionService.take(); - future.get(); - } catch (Exception e) { - LOG.error("Failed to kill unmanaged application master", e); - } - } - this.appIdMap.clear(); super.serviceStop(); } @@ -501,4 +473,62 @@ public Map batchFinishApplicationMaster return responseMap; } + + Runnable createForceFinishApplicationThread() { + return () -> { + + ExecutorCompletionService> completionService = + new ExecutorCompletionService<>(threadpool); + + // Save a local copy of the key set so that it won't change with the map + Set addressList = new HashSet<>(unmanagedAppMasterMap.keySet()); + + LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", addressList.size()); + + for (final String uamId : addressList) { + completionService.submit(() -> { + try { + ApplicationId appId = appIdMap.get(uamId); + LOG.info("Force-killing UAM id {} for application {}", uamId, appId); + UnmanagedApplicationManager applicationManager = unmanagedAppMasterMap.remove(uamId); + KillApplicationResponse response = applicationManager.forceKillApplication(); + return Pair.of(uamId, response); + } catch (Exception e) { + LOG.error("Failed to kill unmanaged application master", e); + return Pair.of(uamId, null); + } + }); + } + + for (int i = 0; i < addressList.size(); ++i) { + try { + Future> future = completionService.take(); + Pair pairs = future.get(); + String uamId = pairs.getLeft(); + ApplicationId appId = appIdMap.get(uamId); + KillApplicationResponse response = pairs.getRight(); + if (response == null) { + throw new YarnException( + "Failed Force-killing UAM id " + uamId + " for application " + appId); + } + LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.", + uamId, appId, response.getIsKillCompleted()); + } catch (Exception e) { + LOG.error("Failed to kill unmanaged application master", e); + } + } + + appIdMap.clear(); + }; + } + + @VisibleForTesting + protected Map getUnmanagedAppMasterMap() { + return unmanagedAppMasterMap; + } + + @VisibleForTesting + protected Thread getFinishApplicationThread() { + return finishApplicationThread; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index abb1d93c33..ddc85bf3e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -20,10 +20,16 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -39,6 +45,7 @@ import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -58,6 +65,9 @@ public class TestUnmanagedApplicationManager { private ApplicationAttemptId attemptId; + private UnmanagedAMPoolManager uamPool; + private ExecutorService threadpool; + @Before public void setup() { conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId"); @@ -69,6 +79,29 @@ public void setup() { uam = new TestableUnmanagedApplicationManager(conf, attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, "rm"); + + threadpool = Executors.newCachedThreadPool(); + uamPool = new TestableUnmanagedAMPoolManager(this.threadpool); + uamPool.init(conf); + uamPool.start(); + } + + @After + public void tearDown() throws IOException, InterruptedException { + if (uam != null) { + uam.shutDownConnections(); + uam = null; + } + if (uamPool != null) { + if (uamPool.isInState(Service.STATE.STARTED)) { + uamPool.stop(); + } + uamPool = null; + } + if (threadpool != null) { + threadpool.shutdownNow(); + threadpool = null; + } } protected void waitForCallBackCountAndCheckZeroPending( @@ -464,4 +497,49 @@ public Object run() { } } + protected class TestableUnmanagedAMPoolManager extends UnmanagedAMPoolManager { + public TestableUnmanagedAMPoolManager(ExecutorService threadpool) { + super(threadpool); + } + + @Override + public UnmanagedApplicationManager createUAM(Configuration configuration, + ApplicationId appId, String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts, String rmId) { + return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter, + appNameSuffix, keepContainersAcrossApplicationAttempts, rmId); + } + } + + @Test + public void testSeparateThreadWithoutBlockServiceStop() throws Exception { + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 1), 1); + Token token1 = uamPool.launchUAM("SC-1", this.conf, + attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1"); + Assert.assertNotNull(token1); + + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 2), 1); + Token token2 = uamPool.launchUAM("SC-2", this.conf, + attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2"); + Assert.assertNotNull(token2); + + Map unmanagedAppMasterMap = + uamPool.getUnmanagedAppMasterMap(); + Assert.assertNotNull(unmanagedAppMasterMap); + Assert.assertEquals(2, unmanagedAppMasterMap.size()); + + // try to stop uamPool + uamPool.stop(); + Assert.assertTrue(uamPool.waitForServiceToStop(2000)); + // process force finish Application in a separate thread, not blocking the main thread + Assert.assertEquals(Service.STATE.STOPPED, uamPool.getServiceState()); + + // Wait for the thread to terminate, check if uamPool#unmanagedAppMasterMap is 0 + Thread finishApplicationThread = uamPool.getFinishApplicationThread(); + GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(), + 100, 2000); + Assert.assertEquals(0, unmanagedAppMasterMap.size()); + } } \ No newline at end of file