YARN-11284. [Federation] Improve UnmanagedAMPoolManager WithoutBlock ServiceStop (#4814)

This commit is contained in:
slfan1989 2022-09-03 01:28:38 +08:00 committed by GitHub
parent 3a96de7756
commit b266f852d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 146 additions and 38 deletions

View File

@ -24,13 +24,13 @@
import java.util.Set; import java.util.Set;
import java.util.HashMap; import java.util.HashMap;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -76,6 +76,10 @@ public class UnmanagedAMPoolManager extends AbstractService {
private ExecutorService threadpool; private ExecutorService threadpool;
private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread";
private Thread finishApplicationThread;
public UnmanagedAMPoolManager(ExecutorService threadpool) { public UnmanagedAMPoolManager(ExecutorService threadpool) {
super(UnmanagedAMPoolManager.class.getName()); super(UnmanagedAMPoolManager.class.getName());
this.threadpool = threadpool; this.threadpool = threadpool;
@ -96,48 +100,16 @@ protected void serviceStart() throws Exception {
* UAMs running, force kill all of them. Do parallel kill because of * UAMs running, force kill all of them. Do parallel kill because of
* performance reasons. * performance reasons.
* *
* TODO: move waiting for the kill to finish into a separate thread, without
* blocking the serviceStop.
*/ */
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
ExecutorCompletionService<KillApplicationResponse> completionService =
new ExecutorCompletionService<>(this.threadpool); if (!this.unmanagedAppMasterMap.isEmpty()) {
if (this.unmanagedAppMasterMap.isEmpty()) { finishApplicationThread = new Thread(createForceFinishApplicationThread());
return; finishApplicationThread.setName(dispatcherThreadName);
finishApplicationThread.start();
} }
// Save a local copy of the key set so that it won't change with the map
Set<String> addressList =
new HashSet<>(this.unmanagedAppMasterMap.keySet());
LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
addressList.size());
for (final String uamId : addressList) {
completionService.submit(new Callable<KillApplicationResponse>() {
@Override
public KillApplicationResponse call() throws Exception {
try {
LOG.info("Force-killing UAM id " + uamId + " for application "
+ appIdMap.get(uamId));
return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
return null;
}
}
});
}
for (int i = 0; i < addressList.size(); ++i) {
try {
Future<KillApplicationResponse> future = completionService.take();
future.get();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
}
}
this.appIdMap.clear();
super.serviceStop(); super.serviceStop();
} }
@ -501,4 +473,62 @@ public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster
return responseMap; return responseMap;
} }
Runnable createForceFinishApplicationThread() {
return () -> {
ExecutorCompletionService<Pair<String, KillApplicationResponse>> completionService =
new ExecutorCompletionService<>(threadpool);
// Save a local copy of the key set so that it won't change with the map
Set<String> addressList = new HashSet<>(unmanagedAppMasterMap.keySet());
LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", addressList.size());
for (final String uamId : addressList) {
completionService.submit(() -> {
try {
ApplicationId appId = appIdMap.get(uamId);
LOG.info("Force-killing UAM id {} for application {}", uamId, appId);
UnmanagedApplicationManager applicationManager = unmanagedAppMasterMap.remove(uamId);
KillApplicationResponse response = applicationManager.forceKillApplication();
return Pair.of(uamId, response);
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
return Pair.of(uamId, null);
}
});
}
for (int i = 0; i < addressList.size(); ++i) {
try {
Future<Pair<String, KillApplicationResponse>> future = completionService.take();
Pair<String, KillApplicationResponse> pairs = future.get();
String uamId = pairs.getLeft();
ApplicationId appId = appIdMap.get(uamId);
KillApplicationResponse response = pairs.getRight();
if (response == null) {
throw new YarnException(
"Failed Force-killing UAM id " + uamId + " for application " + appId);
}
LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.",
uamId, appId, response.getIsKillCompleted());
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
}
}
appIdMap.clear();
};
}
@VisibleForTesting
protected Map<String, UnmanagedApplicationManager> getUnmanagedAppMasterMap() {
return unmanagedAppMasterMap;
}
@VisibleForTesting
protected Thread getFinishApplicationThread() {
return finishApplicationThread;
}
} }

View File

@ -20,10 +20,16 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -39,6 +45,7 @@
import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.AsyncCallback;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -58,6 +65,9 @@ public class TestUnmanagedApplicationManager {
private ApplicationAttemptId attemptId; private ApplicationAttemptId attemptId;
private UnmanagedAMPoolManager uamPool;
private ExecutorService threadpool;
@Before @Before
public void setup() { public void setup() {
conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId"); conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId");
@ -69,6 +79,29 @@ public void setup() {
uam = new TestableUnmanagedApplicationManager(conf, uam = new TestableUnmanagedApplicationManager(conf,
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
"rm"); "rm");
threadpool = Executors.newCachedThreadPool();
uamPool = new TestableUnmanagedAMPoolManager(this.threadpool);
uamPool.init(conf);
uamPool.start();
}
@After
public void tearDown() throws IOException, InterruptedException {
if (uam != null) {
uam.shutDownConnections();
uam = null;
}
if (uamPool != null) {
if (uamPool.isInState(Service.STATE.STARTED)) {
uamPool.stop();
}
uamPool = null;
}
if (threadpool != null) {
threadpool.shutdownNow();
threadpool = null;
}
} }
protected void waitForCallBackCountAndCheckZeroPending( protected void waitForCallBackCountAndCheckZeroPending(
@ -464,4 +497,49 @@ public Object run() {
} }
} }
protected class TestableUnmanagedAMPoolManager extends UnmanagedAMPoolManager {
public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
super(threadpool);
}
@Override
public UnmanagedApplicationManager createUAM(Configuration configuration,
ApplicationId appId, String queueName, String submitter, String appNameSuffix,
boolean keepContainersAcrossApplicationAttempts, String rmId) {
return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter,
appNameSuffix, keepContainersAcrossApplicationAttempts, rmId);
}
}
@Test
public void testSeparateThreadWithoutBlockServiceStop() throws Exception {
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 1), 1);
Token<AMRMTokenIdentifier> token1 = uamPool.launchUAM("SC-1", this.conf,
attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1");
Assert.assertNotNull(token1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 2), 1);
Token<AMRMTokenIdentifier> token2 = uamPool.launchUAM("SC-2", this.conf,
attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2");
Assert.assertNotNull(token2);
Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap =
uamPool.getUnmanagedAppMasterMap();
Assert.assertNotNull(unmanagedAppMasterMap);
Assert.assertEquals(2, unmanagedAppMasterMap.size());
// try to stop uamPool
uamPool.stop();
Assert.assertTrue(uamPool.waitForServiceToStop(2000));
// process force finish Application in a separate thread, not blocking the main thread
Assert.assertEquals(Service.STATE.STOPPED, uamPool.getServiceState());
// Wait for the thread to terminate, check if uamPool#unmanagedAppMasterMap is 0
Thread finishApplicationThread = uamPool.getFinishApplicationThread();
GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(),
100, 2000);
Assert.assertEquals(0, unmanagedAppMasterMap.size());
}
} }