YARN-8862. [BackPort] [GPG] Add Yarn Registry cleanup in ApplicationCleaner. (#6083) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
slfan1989 2023-09-29 07:15:53 +08:00 committed by GitHub
parent 35c42e4039
commit 1d2afc5cf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 159 additions and 32 deletions

View File

@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.api.BindFlags;
@ -142,9 +143,7 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
// Then update the subClusterTokenMap
subClusterTokenMap.put(subClusterId, token);
} catch (YarnException | IOException e) {
LOG.error(
"Failed writing AMRMToken to registry for subcluster " + subClusterId,
e);
LOG.error("Failed writing AMRMToken to registry for subcluster {}.", subClusterId, e);
}
return update;
}
@ -189,8 +188,7 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
retMap.put(scId, amrmToken);
} catch (Exception e) {
LOG.error("Failed reading registry key " + key
+ ", skipping subcluster " + scId, e);
LOG.error("Failed reading registry key {}, skipping subcluster {}.", key, scId, e);
}
}
@ -202,24 +200,39 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
/**
* Remove an application from registry.
*
* @param appId application id
* @param appId application id.
*/
public synchronized void removeAppFromRegistry(ApplicationId appId) {
removeAppFromRegistry(appId, false);
}
/**
* Remove an application from registry.
*
* @param appId application id
* @param ignoreMemoryState whether to ignore the memory data in terms of
* known application
*/
public synchronized void removeAppFromRegistry(ApplicationId appId,
boolean ignoreMemoryState) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId);
if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
return;
if (!ignoreMemoryState) {
if (MapUtils.isEmpty(subClusterTokenMap)) {
return;
}
}
LOG.info("Removing all registry entries for {}.", appId);
// Lastly remove the application directory
String key = getRegistryKey(appId, null);
try {
removeKeyRegistry(this.registry, this.user, key, true, true);
subClusterTokenMap.clear();
if (subClusterTokenMap != null) {
subClusterTokenMap.clear();
}
} catch (YarnException e) {
LOG.error("Failed removing registry directory key " + key, e);
LOG.error("Failed removing registry directory key {}.", key, e);
}
}
@ -247,7 +260,7 @@ public String run() {
}
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry resolve key " + key + " failed", e);
LOG.error("Registry resolve key {} failed.", key, e);
}
}
return null;
@ -271,7 +284,7 @@ public Boolean run() {
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry remove key " + key + " failed", e);
LOG.error("Registry remove key {} failed.", key, e);
}
}
return false;
@ -300,7 +313,7 @@ public Boolean run() {
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry write key " + key + " failed", e);
LOG.error("Registry write key {} failed.", key, e);
}
}
return false;
@ -317,18 +330,15 @@ public Boolean run() {
private List<String> listDirRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean throwIfFails)
throws YarnException {
List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() {
@Override
public List<String> run() {
try {
return registryImpl.list(key);
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry list key " + key + " failed", e);
}
List<String> result = ugi.doAs((PrivilegedAction<List<String>>) () -> {
try {
return registryImpl.list(key);
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry list key {} failed.", key, e);
}
return null;
}
return null;
});
if (result == null && throwIfFails) {
throw new YarnException("Registry list key " + key + " failed");

View File

@ -87,4 +87,31 @@ public void testBasicCase() {
this.registryClient.loadStateFromRegistry(appId).size());
}
@Test
public void testRemoveWithMemoryState() {
ApplicationId appId1 = ApplicationId.newInstance(0, 0);
ApplicationId appId2 = ApplicationId.newInstance(0, 1);
String scId0 = "subcluster0";
this.registryClient.writeAMRMTokenForUAM(appId1, scId0, new Token<>());
this.registryClient.writeAMRMTokenForUAM(appId2, scId0, new Token<>());
Assert.assertEquals(2, this.registryClient.getAllApplications().size());
// Create a new client instance
this.registryClient =
new FederationRegistryClient(this.conf, this.registry, this.user);
this.registryClient.loadStateFromRegistry(appId2);
// Should remove app2
this.registryClient.removeAppFromRegistry(appId2, false);
Assert.assertEquals(1, this.registryClient.getAllApplications().size());
// Should not remove app1 since memory state don't have it
this.registryClient.removeAppFromRegistry(appId1, false);
Assert.assertEquals(1, this.registryClient.getAllApplications().size());
// Should remove app1
this.registryClient.removeAppFromRegistry(appId1, true);
Assert.assertEquals(0, this.registryClient.getAllApplications().size());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
/**
@ -32,4 +33,8 @@ public interface GPGContext {
GPGPolicyFacade getPolicyFacade();
void setPolicyFacade(GPGPolicyFacade facade);
FederationRegistryClient getRegistryClient();
void setRegistryClient(FederationRegistryClient client);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
/**
@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext {
private FederationStateStoreFacade facade;
private GPGPolicyFacade policyFacade;
private FederationRegistryClient registryClient;
@Override
public FederationStateStoreFacade getStateStoreFacade() {
@ -48,4 +50,14 @@ public GPGPolicyFacade getPolicyFacade(){
public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
policyFacade = gpgPolicyfacade;
}
@Override
public FederationRegistryClient getRegistryClient() {
return registryClient;
}
@Override
public void setRegistryClient(FederationRegistryClient client) {
registryClient = client;
}
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
@ -46,6 +47,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
@ -81,6 +83,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Federation Variables
private GPGContext gpgContext;
private RegistryOperations registry;
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
@ -123,6 +126,17 @@ protected void serviceInit(Configuration conf) throws Exception {
new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf);
this.gpgContext.setPolicyFacade(gpgPolicyFacade);
this.registry = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.YARN_REGISTRY_CLASS,
YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
RegistryOperations.class);
this.registry.init(conf);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
FederationRegistryClient registryClient =
new FederationRegistryClient(conf, this.registry, user);
this.gpgContext.setRegistryClient(registryClient);
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
@ -157,6 +171,8 @@ protected void serviceStart() throws Exception {
super.serviceStart();
this.registry.start();
// Schedule SubClusterCleaner service
Configuration config = getConfig();
long scCleanerIntervalMs = config.getTimeDuration(
@ -214,6 +230,11 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
if (this.registry != null) {
this.registry.stop();
this.registry = null;
}
try {
if (this.scheduledExecutorService != null
&& !this.scheduledExecutorService.isShutdown()) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
@ -27,9 +28,11 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable {
private Configuration conf;
private GPGContext gpgContext;
private FederationRegistryClient registryClient;
private int minRouterSuccessCount;
private int maxRouterRetry;
@ -56,6 +60,7 @@ public void init(Configuration config, GPGContext context)
this.gpgContext = context;
this.conf = config;
this.registryClient = context.getRegistryClient();
String routerSpecString =
this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
@ -80,10 +85,9 @@ public void init(Configuration config, GPGContext context)
+ this.minRouterSuccessCount + " should be positive");
}
LOG.info(
"Initialized AppCleaner with Router query with min success {}, "
+ "max retry {}, retry interval {}",
this.minRouterSuccessCount, this.maxRouterRetry,
LOG.info("Initialized AppCleaner with Router query with min success {}, " +
"max retry {}, retry interval {}.", this.minRouterSuccessCount,
this.maxRouterRetry,
DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
}
@ -100,9 +104,9 @@ public GPGContext getGPGContext() {
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
LOG.info(String.format("Contacting router at: %s", webAppAddress));
AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf,
DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
LOG.info("Contacting router at: {}.", webAppAddress);
AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.APPS,
AppsInfo.class, conf, DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
Set<ApplicationId> appSet = new HashSet<>();
for (AppInfo appInfo : appsInfo.getApps()) {
@ -148,6 +152,18 @@ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries");
}
protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
List<String> allApps = this.registryClient.getAllApplications();
LOG.info("Got {} existing apps in registry.", allApps.size());
for (String app : allApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!knownApps.contains(appId)) {
LOG.info("removing finished application entry for {}", app);
this.registryClient.removeAppFromRegistry(appId, true);
}
}
}
@Override
public abstract void run();
}

View File

@ -70,6 +70,8 @@ public void run() {
LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
}
}
// Clean up registry entries
cleanupAppRecordInRegistry(routerApps);
} catch (Throwable e) {
LOG.error("Application cleaner started at time {} fails. ", now, e);
}

View File

@ -24,15 +24,21 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
@ -50,6 +56,8 @@ public class TestDefaultApplicationCleaner {
private FederationStateStoreFacade facade;
private ApplicationCleaner appCleaner;
private GPGContext gpgContext;
private RegistryOperations registry;
private FederationRegistryClient registryClient;
private List<ApplicationId> appIds;
// The list of applications returned by mocked router
@ -68,8 +76,18 @@ public void setup() throws Exception {
facade = FederationStateStoreFacade.getInstance();
facade.reinitialize(stateStore, conf);
registry = new FSRegistryOperationsService();
registry.init(conf);
registry.start();
UserGroupInformation user = UserGroupInformation.getCurrentUser();
registryClient = new FederationRegistryClient(conf, registry, user);
registryClient.cleanAllApplications();
Assert.assertEquals(0, registryClient.getAllApplications().size());
gpgContext = new GPGContextImpl();
gpgContext.setStateStoreFacade(facade);
gpgContext.setRegistryClient(registryClient);
appCleaner = new TestableDefaultApplicationCleaner();
appCleaner.init(conf, gpgContext);
@ -87,7 +105,12 @@ public void setup() throws Exception {
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(
ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
// Write some registry entries for the app
registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(),
new Token<AMRMTokenIdentifier>());
}
Assert.assertEquals(3, registryClient.getAllApplications().size());
}
@After
@ -96,6 +119,14 @@ public void breakDown() {
stateStore.close();
stateStore = null;
}
if (registryClient != null) {
registryClient.cleanAllApplications();
registryClient = null;
}
if (registry != null) {
registry.stop();
registry = null;
}
}
@Test
@ -116,6 +147,9 @@ public void testFederationStateStoreAppsCleanUp() throws YarnException {
.getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest.newInstance())
.getAppsHomeSubClusters().size());
// The known app should not be cleaned in registry
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
/**