YARN-6648. BackPort [GPG] Add SubClusterCleaner in Global Policy Generator. (#5676)

This commit is contained in:
slfan1989 2023-06-13 06:18:43 +08:00 committed by GitHub
parent fb16e00da0
commit a409d52ef8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 374 additions and 5 deletions

View File

@ -387,6 +387,11 @@
<Method name="initAndStartNodeManager" /> <Method name="initAndStartNodeManager" />
<Bug pattern="DM_EXIT" /> <Bug pattern="DM_EXIT" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.globalpolicygenerator.GlobalPolicyGenerator" />
<Medhod name="startGPG" />
<Bug pattern="DM_EXIT" />
</Match>
<!-- Ignore heartbeat exception when killing localizer --> <!-- Ignore heartbeat exception when killing localizer -->
<Match> <Match>

View File

@ -542,7 +542,7 @@ public static boolean isAclEnabled(Configuration conf) {
*/ */
public static final String GLOBAL_RM_AM_MAX_ATTEMPTS = public static final String GLOBAL_RM_AM_MAX_ATTEMPTS =
RM_PREFIX + "am.global.max-attempts"; RM_PREFIX + "am.global.max-attempts";
/** The keytab for the resource manager.*/ /** The keytab for the resource manager.*/
public static final String RM_KEYTAB = public static final String RM_KEYTAB =
RM_PREFIX + "keytab"; RM_PREFIX + "keytab";
@ -598,7 +598,7 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms"; RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
public static final int public static final int
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0; DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;
/** Path to file with nodes to exclude.*/ /** Path to file with nodes to exclude.*/
public static final String RM_NODES_EXCLUDE_FILE_PATH = public static final String RM_NODES_EXCLUDE_FILE_PATH =
RM_PREFIX + "nodes.exclude-path"; RM_PREFIX + "nodes.exclude-path";
@ -1551,7 +1551,7 @@ public static boolean isAclEnabled(Configuration conf) {
+ "log-aggregation.debug.filesize"; + "log-aggregation.debug.filesize";
public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE
= 100 * 1024 * 1024; = 100 * 1024 * 1024;
/** /**
* How long to wait between aggregated log retention checks. If set to * How long to wait between aggregated log retention checks. If set to
* a value {@literal <=} 0 then the value is computed as one-tenth of the * a value {@literal <=} 0 then the value is computed as one-tenth of the
@ -2201,7 +2201,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_NM_HEALTH_CHECK_TIMEOUT_MS = public static final long DEFAULT_NM_HEALTH_CHECK_TIMEOUT_MS =
2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;
/** Health check script time out period.*/ /** Health check script time out period.*/
public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE = public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE =
NM_PREFIX + "health-checker.%s.timeout-ms"; NM_PREFIX + "health-checker.%s.timeout-ms";
@ -2922,7 +2922,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Binding address for the web proxy. */ /** Binding address for the web proxy. */
public static final String PROXY_BIND_HOST = public static final String PROXY_BIND_HOST =
PROXY_PREFIX + "bind-host"; PROXY_PREFIX + "bind-host";
/** /**
* YARN Service Level Authorization * YARN Service Level Authorization
*/ */
@ -4340,6 +4340,24 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false; false;
private static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg.";
// The number of threads to use for the GPG scheduled executor service
public static final String GPG_SCHEDULED_EXECUTOR_THREADS =
FEDERATION_GPG_PREFIX + "scheduled.executor.threads";
public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10;
// The interval at which the subcluster cleaner runs, -1 means disabled
public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
TimeUnit.MILLISECONDS.toMillis(-1);
// The expiration time for a subcluster heartbeat, default is 30 minutes
public static final String GPG_SUBCLUSTER_EXPIRATION_MS =
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(30);
/** /**
* Connection and Read timeout from the Router to RM. * Connection and Read timeout from the Router to RM.
*/ */

View File

@ -5264,6 +5264,32 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>
The number of threads to use for the GPG scheduled executor service.
default is 10.
</description>
<name>yarn.federation.gpg.scheduled.executor.threads</name>
<value>10</value>
</property>
<property>
<description>
The interval at which the subcluster cleaner runs, -1 means disabled.
default is -1.
</description>
<name>yarn.federation.gpg.subcluster.cleaner.interval-ms</name>
<value>-1ms</value>
</property>
<property>
<description>
The expiration time for a subcluster heartbeat, default is 30 minutes.
</description>
<name>yarn.federation.gpg.subcluster.heartbeat.expiration-ms</name>
<value>30m</value>
</property>
<property> <property>
<name>yarn.apps.cache.enable</name> <name>yarn.apps.cache.enable</name>
<value>false</value> <value>false</value>

View File

@ -215,6 +215,17 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatReques
return SubClusterHeartbeatResponse.newInstance(); return SubClusterHeartbeatResponse.newInstance();
} }
@VisibleForTesting
public void setSubClusterLastHeartbeat(SubClusterId subClusterId,
long lastHeartbeat) throws YarnException {
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
throw new YarnException(
"Subcluster " + subClusterId.toString() + " does not exist");
}
subClusterInfo.setLastHeartBeat(lastHeartbeat);
}
@Override @Override
public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request) public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
throws YarnException { throws YarnException {

View File

@ -266,6 +266,26 @@ public Map<SubClusterId, SubClusterInfo> getSubClusters(final boolean filterInac
} }
} }
/**
* Updates the cache with the central {@link FederationStateStore} and returns
* the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @param flushCache flag to indicate if the cache should be flushed or not
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public Map<SubClusterId, SubClusterInfo> getSubClusters(
final boolean filterInactiveSubClusters, final boolean flushCache)
throws YarnException {
if (flushCache && federationCache.isCachingEnabled()) {
LOG.info("Flushing subClusters from cache and rehydrating from store.");
federationCache.removeSubCluster(flushCache);
}
return getSubClusters(filterInactiveSubClusters);
}
/** /**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
* *

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator; package org.apache.hadoop.yarn.server.globalpolicygenerator;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
@ -28,6 +31,7 @@
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,6 +59,10 @@ public class GlobalPolicyGenerator extends CompositeService {
// Federation Variables // Federation Variables
private GPGContext gpgContext; private GPGContext gpgContext;
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
public GlobalPolicyGenerator() { public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName()); super(GlobalPolicyGenerator.class.getName());
this.gpgContext = new GPGContextImpl(); this.gpgContext = new GPGContextImpl();
@ -78,6 +86,11 @@ protected void serviceInit(Configuration conf) throws Exception {
this.gpgContext this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance()); .setStateStoreFacade(FederationStateStoreFacade.getInstance());
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
DefaultMetricsSystem.initialize(METRICS_NAME); DefaultMetricsSystem.initialize(METRICS_NAME);
// super.serviceInit after all services are added // super.serviceInit after all services are added
@ -87,10 +100,33 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
super.serviceStart(); super.serviceStart();
// Scheduler SubClusterCleaner service
Configuration config = getConfig();
long scCleanerIntervalMs = config.getTimeDuration(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);
if (scCleanerIntervalMs > 0) {
this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
try {
if (this.scheduledExecutorService != null
&& !this.scheduledExecutorService.isShutdown()) {
this.scheduledExecutorService.shutdown();
LOG.info("Stopped ScheduledExecutorService");
}
} catch (Exception e) {
LOG.error("Failed to shutdown ScheduledExecutorService", e);
throw e;
}
if (this.isStopping.getAndSet(true)) { if (this.isStopping.getAndSet(true)) {
return; return;
} }

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The sub-cluster cleaner is one of the GPG's services that periodically checks
* the membership table in FederationStateStore and mark sub-clusters that have
* not sent a heartbeat in certain amount of time as LOST.
*/
public class SubClusterCleaner implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(SubClusterCleaner.class);
private GPGContext gpgContext;
private long heartbeatExpirationMillis;
/**
* The sub-cluster cleaner runnable is invoked by the sub cluster cleaner
* service to check the membership table and remove sub clusters that have not
* sent a heart beat in some amount of time.
*
* @param conf configuration.
* @param gpgContext GPGContext.
*/
public SubClusterCleaner(Configuration conf, GPGContext gpgContext) {
this.heartbeatExpirationMillis = conf.getTimeDuration(
YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS, TimeUnit.MILLISECONDS);
this.gpgContext = gpgContext;
LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}",
DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis));
}
@Override
public void run() {
try {
Date now = new Date();
LOG.info("SubClusterCleaner at {}", now);
Map<SubClusterId, SubClusterInfo> infoMap =
this.gpgContext.getStateStoreFacade().getSubClusters(false, true);
// Iterate over each sub cluster and check last heartbeat
for (Map.Entry<SubClusterId, SubClusterInfo> entry : infoMap.entrySet()) {
SubClusterInfo subClusterInfo = entry.getValue();
Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat());
if (LOG.isDebugEnabled()) {
LOG.debug("Checking subcluster {} in state {}, last heartbeat at {}",
subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
lastHeartBeat);
}
if (subClusterInfo.getState().isUsable()) {
long timeUntilDeregister = this.heartbeatExpirationMillis
- (now.getTime() - lastHeartBeat.getTime());
// Deregister sub-cluster as SC_LOST if last heartbeat too old
if (timeUntilDeregister < 0) {
LOG.warn(
"Deregistering subcluster {} in state {} last heartbeat at {}",
subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
new Date(subClusterInfo.getLastHeartBeat()));
try {
this.gpgContext.getStateStoreFacade().deregisterSubCluster(
subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST);
} catch (Exception e) {
LOG.error("deregisterSubCluster failed on subcluster "
+ subClusterInfo.getSubClusterId(), e);
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("Time until deregister for subcluster {}: {}",
entry.getKey(),
DurationFormatUtils.formatDurationISO(timeUntilDeregister));
}
}
}
} catch (Throwable e) {
LOG.error("Subcluster cleaner fails: ", e);
}
}
}

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Unit test for Sub-cluster Cleaner in GPG.
*/
public class TestSubClusterCleaner {
private Configuration conf;
private MemoryFederationStateStore stateStore;
private FederationStateStoreFacade facade;
private SubClusterCleaner cleaner;
private GPGContext gpgContext;
private static final long TWO_SECONDS = TimeUnit.SECONDS.toMillis(2);
private ArrayList<SubClusterId> subClusterIds;
@Before
public void setup() throws YarnException {
conf = new YarnConfiguration();
// subcluster expires in one second
conf.setLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, 1000);
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
facade = FederationStateStoreFacade.getInstance();
facade.reinitialize(stateStore, conf);
gpgContext = new GPGContextImpl();
gpgContext.setStateStoreFacade(facade);
cleaner = new SubClusterCleaner(conf, gpgContext);
// Create and register six sub clusters
subClusterIds = new ArrayList<SubClusterId>();
for (int i = 0; i < 3; i++) {
// Create sub cluster id and info
SubClusterId subClusterId =
SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
"1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4",
SubClusterState.SC_RUNNING, System.currentTimeMillis(), "");
// Register the sub cluster
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
// Append the id to a local list
subClusterIds.add(subClusterId);
}
}
@After
public void breakDown() throws Exception {
stateStore.close();
}
@Test
public void testSubClusterRegisterHeartBeatTime() throws YarnException {
cleaner.run();
Assert.assertEquals(3, facade.getSubClusters(true, true).size());
}
/**
* Test the base use case.
*/
@Test
public void testSubClusterHeartBeat() throws YarnException {
// The first subcluster reports as Unhealthy
SubClusterId subClusterId = subClusterIds.get(0);
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity"));
// The second subcluster didn't heartbeat for two seconds, should mark lost
subClusterId = subClusterIds.get(1);
stateStore.setSubClusterLastHeartbeat(subClusterId,
System.currentTimeMillis() - TWO_SECONDS);
cleaner.run();
Assert.assertEquals(1, facade.getSubClusters(true, true).size());
}
}