diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c2be5d420b..0e317712f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5132,10 +5132,10 @@
yarn.router.interceptor.user-thread-pool.keep-alive-time
- 0s
+ 30s
This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
- Default is 0s.
+ Default is 30s.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
index af0bd6184b..76380af8c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -95,6 +94,10 @@ public GPGContext getGPGContext() {
return this.gpgContext;
}
+ public FederationRegistryClient getRegistryClient() {
+ return this.registryClient;
+ }
+
/**
* Query router for applications.
*
@@ -152,18 +155,6 @@ public Set getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries");
}
- protected void cleanupAppRecordInRegistry(Set knownApps) {
- List allApps = this.registryClient.getAllApplications();
- LOG.info("Got {} existing apps in registry.", allApps.size());
- for (String app : allApps) {
- ApplicationId appId = ApplicationId.fromString(app);
- if (!knownApps.contains(appId)) {
- LOG.info("removing finished application entry for {}", app);
- this.registryClient.removeAppFromRegistry(appId, true);
- }
- }
- }
-
@Override
public abstract void run();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
index 5b2ff26fcf..c3f79d0284 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -45,33 +46,49 @@ public void run() {
LOG.info("Application cleaner run at time {}", now);
FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
- Set candidates = new HashSet<>();
try {
+ // Get the candidate list from StateStore before calling router
+ Set allStateStoreApps = new HashSet<>();
List response =
facade.getApplicationsHomeSubCluster();
for (ApplicationHomeSubCluster app : response) {
- candidates.add(app.getApplicationId());
+ allStateStoreApps.add(app.getApplicationId());
}
- LOG.info("{} app entries in FederationStateStore", candidates.size());
+ LOG.info("{} app entries in FederationStateStore", allStateStoreApps.size());
+ // Get the candidate list from Registry before calling router
+ List allRegistryApps = getRegistryClient().getAllApplications();
+ LOG.info("{} app entries in FederationRegistry", allStateStoreApps.size());
+
+ // Get the list of known apps from Router
Set routerApps = getRouterKnownApplications();
LOG.info("{} known applications from Router", routerApps.size());
- candidates.removeAll(routerApps);
- LOG.info("Deleting {} applications from statestore", candidates.size());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString)
- .collect(Collectors.joining(",")));
- }
- for (ApplicationId appId : candidates) {
+ // Clean up StateStore entries
+ Set toDelete =
+ Sets.difference(allStateStoreApps, routerApps);
+
+ LOG.info("Deleting {} applications from statestore", toDelete.size());
+ LOG.debug("Apps to delete: {}.",
+ toDelete.stream().map(Object::toString).collect(Collectors.joining(",")));
+
+ for (ApplicationId appId : toDelete) {
try {
+ LOG.debug("Deleting {} from statestore ", appId);
facade.deleteApplicationHomeSubCluster(appId);
} catch (Exception e) {
LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
}
}
- // Clean up registry entries
- cleanupAppRecordInRegistry(routerApps);
+
+ // Clean up Registry entries
+ for (String app : allRegistryApps) {
+ ApplicationId appId = ApplicationId.fromString(app);
+ if (!routerApps.contains(appId)) {
+ LOG.debug("removing finished application entry for {}", app);
+ getRegistryClient().removeAppFromRegistry(appId, true);
+ }
+ }
} catch (Throwable e) {
LOG.error("Application cleaner started at time {} fails. ", now, e);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
index 1e703b5196..c028bbdbe2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
@@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner {
// The list of applications returned by mocked router
private Set routerAppIds;
+ private ApplicationId appIdToAddConcurrently;
+
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
@@ -111,6 +114,7 @@ public void setup() throws Exception {
new Token());
}
Assert.assertEquals(3, registryClient.getAllApplications().size());
+ appIdToAddConcurrently = null;
}
@After
@@ -159,7 +163,42 @@ public class TestableDefaultApplicationCleaner
extends DefaultApplicationCleaner {
@Override
public Set getAppsFromRouter() throws YarnRuntimeException {
+ if (appIdToAddConcurrently != null) {
+ SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
+ try {
+ ApplicationHomeSubCluster appHomeSubCluster =
+ ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, scId);
+ AddApplicationHomeSubClusterRequest request =
+ AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster);
+ stateStore.addApplicationHomeSubCluster(request);
+ } catch (YarnException e) {
+ throw new YarnRuntimeException(e);
+ }
+ registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, scId.toString(),
+ new Token<>());
+ }
return routerAppIds;
}
}
+
+ @Test
+ public void testConcurrentNewApp() throws YarnException {
+ appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
+
+ appCleaner.run();
+
+ // The concurrently added app should be still there
+ GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest =
+ GetApplicationsHomeSubClusterRequest.newInstance();
+ GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster =
+ stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest);
+ Assert.assertNotNull(applicationsHomeSubCluster);
+ List appsHomeSubClusters =
+ applicationsHomeSubCluster.getAppsHomeSubClusters();
+ Assert.assertNotNull(appsHomeSubClusters);
+ Assert.assertEquals(1, appsHomeSubClusters.size());
+
+ // The concurrently added app should be still there
+ Assert.assertEquals(1, registryClient.getAllApplications().size());
+ }
}