YARN-9013. [BackPort] [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. (#6147) Contributed by Botong Huang, Shilun Fan.

Co-authored-by: Botong Huang <botong@apache.org>
Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
slfan1989 2023-10-31 06:56:00 +08:00 committed by GitHub
parent a079f6261d
commit 254dbab5a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 27 deletions

View File

@ -5132,10 +5132,10 @@
<property> <property>
<name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name> <name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name>
<value>0s</value> <value>30s</value>
<description> <description>
This configurable is used to set the keepAliveTime of the thread pool of the interceptor. This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
Default is 0s. Default is 30s.
</description> </description>
</property> </property>

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.time.DurationFormatUtils;
@ -95,6 +94,10 @@ public GPGContext getGPGContext() {
return this.gpgContext; return this.gpgContext;
} }
public FederationRegistryClient getRegistryClient() {
return this.registryClient;
}
/** /**
* Query router for applications. * Query router for applications.
* *
@ -152,18 +155,6 @@ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries"); + " success Router queries after " + totalAttemptCount + " retries");
} }
protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
List<String> allApps = this.registryClient.getAllApplications();
LOG.info("Got {} existing apps in registry.", allApps.size());
for (String app : allApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!knownApps.contains(appId)) {
LOG.info("removing finished application entry for {}", app);
this.registryClient.removeAppFromRegistry(appId, true);
}
}
}
@Override @Override
public abstract void run(); public abstract void run();
} }

View File

@ -24,6 +24,7 @@
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -45,33 +46,49 @@ public void run() {
LOG.info("Application cleaner run at time {}", now); LOG.info("Application cleaner run at time {}", now);
FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade(); FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
Set<ApplicationId> candidates = new HashSet<>();
try { try {
// Get the candidate list from StateStore before calling router
Set<ApplicationId> allStateStoreApps = new HashSet<>();
List<ApplicationHomeSubCluster> response = List<ApplicationHomeSubCluster> response =
facade.getApplicationsHomeSubCluster(); facade.getApplicationsHomeSubCluster();
for (ApplicationHomeSubCluster app : response) { for (ApplicationHomeSubCluster app : response) {
candidates.add(app.getApplicationId()); allStateStoreApps.add(app.getApplicationId());
} }
LOG.info("{} app entries in FederationStateStore", candidates.size()); LOG.info("{} app entries in FederationStateStore", allStateStoreApps.size());
// Get the candidate list from Registry before calling router
List<String> allRegistryApps = getRegistryClient().getAllApplications();
LOG.info("{} app entries in FederationRegistry", allStateStoreApps.size());
// Get the list of known apps from Router
Set<ApplicationId> routerApps = getRouterKnownApplications(); Set<ApplicationId> routerApps = getRouterKnownApplications();
LOG.info("{} known applications from Router", routerApps.size()); LOG.info("{} known applications from Router", routerApps.size());
candidates.removeAll(routerApps); // Clean up StateStore entries
LOG.info("Deleting {} applications from statestore", candidates.size()); Set<ApplicationId> toDelete =
if (LOG.isDebugEnabled()) { Sets.difference(allStateStoreApps, routerApps);
LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString)
.collect(Collectors.joining(","))); LOG.info("Deleting {} applications from statestore", toDelete.size());
} LOG.debug("Apps to delete: {}.",
for (ApplicationId appId : candidates) { toDelete.stream().map(Object::toString).collect(Collectors.joining(",")));
for (ApplicationId appId : toDelete) {
try { try {
LOG.debug("Deleting {} from statestore ", appId);
facade.deleteApplicationHomeSubCluster(appId); facade.deleteApplicationHomeSubCluster(appId);
} catch (Exception e) { } catch (Exception e) {
LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e); LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
} }
} }
// Clean up registry entries
cleanupAppRecordInRegistry(routerApps); // Clean up Registry entries
for (String app : allRegistryApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!routerApps.contains(appId)) {
LOG.debug("removing finished application entry for {}", app);
getRegistryClient().removeAppFromRegistry(appId, true);
}
}
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("Application cleaner started at time {} fails. ", now, e); LOG.error("Application cleaner started at time {} fails. ", now, e);
} }

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner {
// The list of applications returned by mocked router // The list of applications returned by mocked router
private Set<ApplicationId> routerAppIds; private Set<ApplicationId> routerAppIds;
private ApplicationId appIdToAddConcurrently;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
@ -111,6 +114,7 @@ public void setup() throws Exception {
new Token<AMRMTokenIdentifier>()); new Token<AMRMTokenIdentifier>());
} }
Assert.assertEquals(3, registryClient.getAllApplications().size()); Assert.assertEquals(3, registryClient.getAllApplications().size());
appIdToAddConcurrently = null;
} }
@After @After
@ -159,7 +163,42 @@ public class TestableDefaultApplicationCleaner
extends DefaultApplicationCleaner { extends DefaultApplicationCleaner {
@Override @Override
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException { public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
if (appIdToAddConcurrently != null) {
SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
try {
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, scId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster);
stateStore.addApplicationHomeSubCluster(request);
} catch (YarnException e) {
throw new YarnRuntimeException(e);
}
registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, scId.toString(),
new Token<>());
}
return routerAppIds; return routerAppIds;
} }
} }
@Test
public void testConcurrentNewApp() throws YarnException {
appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
appCleaner.run();
// The concurrently added app should be still there
GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster =
stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest);
Assert.assertNotNull(applicationsHomeSubCluster);
List<ApplicationHomeSubCluster> appsHomeSubClusters =
applicationsHomeSubCluster.getAppsHomeSubClusters();
Assert.assertNotNull(appsHomeSubClusters);
Assert.assertEquals(1, appsHomeSubClusters.size());
// The concurrently added app should be still there
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
} }