From a5f48eacca4b80c0cc31f7de96652ff120179c01 Mon Sep 17 00:00:00 2001
From: slfan1989 <55643692+slfan1989@users.noreply.github.com>
Date: Thu, 16 Feb 2023 06:40:34 +0800
Subject: [PATCH] YARN-11425. [Federation] Router Supports SubClusterCleaner.
(#5326)
---
.../hadoop/yarn/conf/YarnConfiguration.java | 22 +++
.../src/main/resources/yarn-default.xml | 40 +++++
.../impl/MemoryFederationStateStore.java | 11 ++
.../utils/FederationStateStoreFacade.java | 24 +++
.../hadoop/yarn/server/router/Router.java | 37 +++-
.../router/cleaner/SubClusterCleaner.java | 92 ++++++++++
.../server/router/cleaner/package-info.java | 20 +++
.../router/cleaner/TestSubClusterCleaner.java | 158 ++++++++++++++++++
8 files changed, 398 insertions(+), 6 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/SubClusterCleaner.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/package-info.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/cleaner/TestSubClusterCleaner.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 316a642188..699059f068 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4299,6 +4299,28 @@ public class YarnConfiguration extends Configuration {
ROUTER_PREFIX + "interceptor.allow-partial-result.enable";
public static final boolean DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = false;
+ /** Router SubCluster Cleaner Thread Clean Interval Time. **/
+ public static final String ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME =
+ ROUTER_PREFIX + "subcluster.cleaner.interval.time";
+ public static final long DEFAULT_ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME =
+ TimeUnit.SECONDS.toMillis(60);
+
+ /** Router SubCluster Timeout Allowed by Router. **/
+ public static final String ROUTER_SUBCLUSTER_EXPIRATION_TIME =
+ ROUTER_PREFIX + "subcluster.heartbeat.expiration.time";
+ public static final long DEFAULT_ROUTER_SUBCLUSTER_EXPIRATION_TIME =
+ TimeUnit.MINUTES.toMillis(30);
+
+ /** Router Thread Pool Schedule Thread Number. **/
+ public static final String ROUTER_SCHEDULED_EXECUTOR_THREADS =
+ ROUTER_PREFIX + "scheduled.executor.threads";
+ public static final int DEFAULT_ROUTER_SCHEDULED_EXECUTOR_THREADS = 1;
+
+ /** Enable DeregisterSubCluster, enabled by default. **/
+ public static final String ROUTER_DEREGISTER_SUBCLUSTER_ENABLED =
+ ROUTER_PREFIX + "deregister.subcluster.enabled";
+ public static final boolean DEFAULT_ROUTER_DEREGISTER_SUBCLUSTER_ENABLED = true;
+
////////////////////////////////
// CSI Volume configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 80672fb1cc..dc58f2f828 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5117,4 +5117,44 @@
+
+
+ The number of threads to use for the Router scheduled executor service.
+
+ yarn.router.subcluster.cleaner.interval.time
+ 1
+
+
+
+
+ The interval at which the subClusterCleaner runs. Default is 60s.
+
+ yarn.router.subcluster.cleaner.interval.time
+ 60s
+
+
+
+
+ SubCluster heartbeat timeout. Default is 30mins.
+
+ yarn.router.subcluster.heartbeat.expiration.time
+ 30m
+
+
+
+
+ Whether to enable deregisterSubCluster. Default is true.
+
+ yarn.router.deregister.subcluster.enabled
+ true
+
+
+
+
+ Number of Router Scheduler Threads.
+
+ yarn.router.scheduled.executor.threads
+ 1
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index d44c30eef2..b91de3ae80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -34,6 +34,7 @@ import java.util.Comparator;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -606,4 +607,14 @@ public class MemoryFederationStateStore implements FederationStateStore {
public void setMembership(Map membership) {
this.membership = membership;
}
+
+ @VisibleForTesting
+ public void setExpiredHeartbeat(SubClusterId subClusterId, long heartBearTime)
+ throws YarnRuntimeException {
+ if(!membership.containsKey(subClusterId)){
+ throw new YarnRuntimeException("subClusterId = " + subClusterId + "not exist");
+ }
+ SubClusterInfo subClusterInfo = membership.get(subClusterId);
+ subClusterInfo.setLastHeartBeat(heartBearTime);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 12625a60e9..ebad527b6d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -89,6 +89,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1187,4 +1190,25 @@ public final class FederationStateStoreFacade {
reservationHomeSubCluster);
}
}
+
+ /**
+ * Deregister subCluster, Update the subCluster state to
+ * SC_LOST、SC_DECOMMISSIONED etc.
+ *
+ * @param subClusterId subClusterId.
+ * @param subClusterState The state of the subCluster to be updated.
+ * @throws YarnException yarn exception.
+ * @return If Deregister subCluster is successful, return true, otherwise, return false.
+ */
+ public boolean deregisterSubCluster(SubClusterId subClusterId,
+ SubClusterState subClusterState) throws YarnException {
+ SubClusterDeregisterRequest deregisterRequest =
+ SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState);
+ SubClusterDeregisterResponse response = stateStore.deregisterSubCluster(deregisterRequest);
+ // If the response is not empty, deregisterSubCluster is successful.
+ if (response != null) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 24e9ad23c9..77abf18bd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.router;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
+import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebApp;
@@ -50,6 +54,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_ROUTER_DEREGISTER_SUBCLUSTER_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.ROUTER_DEREGISTER_SUBCLUSTER_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.ROUTER_SCHEDULED_EXECUTOR_THREADS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_ROUTER_SCHEDULED_EXECUTOR_THREADS;
+
/**
* The router is a stateless YARN component which is the entry point to the
* cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
@@ -88,6 +99,9 @@ public class Router extends CompositeService {
private static final String METRICS_NAME = "Router";
+ private ScheduledThreadPoolExecutor scheduledExecutorService;
+ private SubClusterCleaner subClusterCleaner;
+
public Router() {
super(Router.class.getName());
}
@@ -117,6 +131,12 @@ public class Router extends CompositeService {
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor);
+ // Initialize subClusterCleaner
+ this.subClusterCleaner = new SubClusterCleaner(this.conf);
+ int scheduledExecutorThreads = conf.getInt(ROUTER_SCHEDULED_EXECUTOR_THREADS,
+ DEFAULT_ROUTER_SCHEDULED_EXECUTOR_THREADS);
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(scheduledExecutorThreads);
+
WebServiceClient.initialize(config);
super.serviceInit(conf);
}
@@ -128,6 +148,16 @@ public class Router extends CompositeService {
} catch (IOException e) {
throw new YarnRuntimeException("Failed Router login", e);
}
+ boolean isDeregisterSubClusterEnabled = this.conf.getBoolean(
+ ROUTER_DEREGISTER_SUBCLUSTER_ENABLED, DEFAULT_ROUTER_DEREGISTER_SUBCLUSTER_ENABLED);
+ if (isDeregisterSubClusterEnabled) {
+ long scCleanerIntervalMs = this.conf.getTimeDuration(ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME,
+ DEFAULT_ROUTER_SUBCLUSTER_CLEANER_INTERVAL_TIME, TimeUnit.MINUTES);
+ this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
+ 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled SubClusterCleaner With Interval: {}.",
+ DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
+ }
startWepApp();
super.serviceStart();
}
@@ -146,12 +176,7 @@ public class Router extends CompositeService {
}
protected void shutDown() {
- new Thread() {
- @Override
- public void run() {
- Router.this.stop();
- }
- }.start();
+ new Thread(() -> Router.this.stop()).start();
}
protected RouterClientRMService createClientRMProxyService() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/SubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/SubClusterCleaner.java
new file mode 100644
index 0000000000..1147f7742d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/SubClusterCleaner.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The SubClusterCleaner thread is used to check whether the SubCluster
+ * has exceeded the heartbeat time.
+ * If the SubCluster heartbeat time exceeds 30 mins, set the SubCluster to LOST.
+ * Check the thread every 1 mins, check once.
+ */
+public class SubClusterCleaner implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SubClusterCleaner.class);
+ private FederationStateStoreFacade federationFacade;
+ private long heartbeatExpirationMillis;
+
+ public SubClusterCleaner(Configuration conf) {
+ federationFacade = FederationStateStoreFacade.getInstance();
+ this.heartbeatExpirationMillis =
+ conf.getTimeDuration(YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME,
+ YarnConfiguration.DEFAULT_ROUTER_SUBCLUSTER_EXPIRATION_TIME, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Step1. Get Current Time.
+ Date now = new Date();
+ LOG.info("SubClusterCleaner at {}.", now);
+
+ Map subClusters = federationFacade.getSubClusters(true);
+
+ for (Map.Entry subCluster : subClusters.entrySet()) {
+ // Step2. Get information about subClusters.
+ SubClusterId subClusterId = subCluster.getKey();
+ SubClusterInfo subClusterInfo = subCluster.getValue();
+ SubClusterState subClusterState = subClusterInfo.getState();
+ long lastHeartBeatTime = subClusterInfo.getLastHeartBeat();
+
+ // We Only Check SubClusters in NEW and RUNNING states
+ if (!subClusterState.isUnusable()) {
+ long heartBeatInterval = now.getTime() - lastHeartBeatTime;
+ try {
+ // HeartBeat Interval Exceeds Expiration Time
+ if (heartBeatInterval > heartbeatExpirationMillis) {
+ LOG.info("Deregister SubCluster {} in state {} last heartbeat at {}.",
+ subClusterId, subClusterState, new Date(lastHeartBeatTime));
+ federationFacade.deregisterSubCluster(subClusterId, SubClusterState.SC_LOST);
+ }
+ } catch (YarnException e) {
+ LOG.error("deregisterSubCluster failed on SubCluster {}.", subClusterId, e);
+ }
+ } else {
+ LOG.debug("SubCluster {} in state {} last heartbeat at {}, " +
+ "heartbeat interval < 30mins, no need for Deregister.",
+ subClusterId, subClusterState, new Date(lastHeartBeatTime));
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("SubClusterCleaner Fails.", e);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/package-info.java
new file mode 100644
index 0000000000..75477508cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/cleaner/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router Cleaner package. **/
+package org.apache.hadoop.yarn.server.router.cleaner;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/cleaner/TestSubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/cleaner/TestSubClusterCleaner.java
new file mode 100644
index 0000000000..57d427581a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/cleaner/TestSubClusterCleaner.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class TestSubClusterCleaner {
+
+ ////////////////////////////////
+ // Router Constants
+ ////////////////////////////////
+ private Configuration conf;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreFacade facade;
+ private SubClusterCleaner cleaner;
+ private final static int NUM_SUBCLUSTERS = 4;
+ private final static long EXPIRATION_TIME = Time.now() - 5000;
+
+ @Before
+ public void setup() throws YarnException {
+ conf = new YarnConfiguration();
+ conf.setLong(YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME, 1000);
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+
+ facade = FederationStateStoreFacade.getInstance();
+ facade.reinitialize(stateStore, conf);
+
+ cleaner = new SubClusterCleaner(conf);
+ for (int i = 0; i < NUM_SUBCLUSTERS; i++){
+ // Create sub cluster id and info
+ SubClusterId subClusterId = SubClusterId.newInstance("SC-" + i);
+ SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+ "127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3", "127.0.0.1:4",
+ SubClusterState.SC_RUNNING, Time.now(), "");
+ // Register the subCluster
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ }
+ }
+
+ @Test
+ public void testSubClustersWithOutHeartBeat()
+ throws InterruptedException, TimeoutException, YarnException {
+
+ // We set up such a unit test, We set the status of all subClusters to RUNNING,
+ // and Manually set subCluster heartbeat expiration.
+ // At this time, the size of the Active SubCluster is 0.
+ Map subClustersMap = facade.getSubClusters(false);
+
+ // Step1. Manually set subCluster heartbeat expiration.
+ // subCluster has no heartbeat, and all subClusters will expire.
+ subClustersMap.keySet().forEach(subClusterId ->
+ stateStore.setExpiredHeartbeat(subClusterId, EXPIRATION_TIME));
+
+ // Step2. Run the Cleaner to change the status of the expired SubCluster to SC_LOST.
+ cleaner.run();
+
+ // Step3. All clusters have expired,
+ // so the current Federation has no active subClusters.
+ int count = facade.getActiveSubClustersCount();
+ Assert.assertEquals(0, count);
+
+ // Step4. Check Active SubCluster Status.
+ // We want all subClusters to be SC_LOST.
+ subClustersMap.values().forEach(subClusterInfo -> {
+ SubClusterState subClusterState = subClusterInfo.getState();
+ Assert.assertEquals(SubClusterState.SC_LOST, subClusterState);
+ });
+ }
+
+ @Test
+ public void testSubClustersPartWithHeartBeat() throws YarnException, InterruptedException {
+
+ // Step1. Manually set subCluster heartbeat expiration.
+ for (int i = 0; i < NUM_SUBCLUSTERS; i++) {
+ // Create subCluster id and info.
+ expiredSubcluster("SC-" + i);
+ }
+
+ // Step2. Run the Cleaner to change the status of the expired SubCluster to SC_LOST.
+ cleaner.run();
+
+ // Step3. Let SC-0, SC-1 resume heartbeat.
+ resumeSubClusterHeartbeat("SC-0");
+ resumeSubClusterHeartbeat("SC-1");
+
+ // Step4. At this point we should have 2 subClusters that are surviving clusters.
+ int count = facade.getActiveSubClustersCount();
+ Assert.assertEquals(2, count);
+
+ // Step5. The result we expect is that SC-0 and SC-1 are in the RUNNING state,
+ // and SC-2 and SC-3 are in the SC_LOST state.
+ checkSubClusterState("SC-0", SubClusterState.SC_RUNNING);
+ checkSubClusterState("SC-1", SubClusterState.SC_RUNNING);
+ checkSubClusterState("SC-2", SubClusterState.SC_LOST);
+ checkSubClusterState("SC-3", SubClusterState.SC_LOST);
+ }
+
+ private void resumeSubClusterHeartbeat(String pSubClusterId)
+ throws YarnException {
+ SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
+ SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest.newInstance(
+ subClusterId, Time.now(), SubClusterState.SC_RUNNING, "test");
+ SubClusterHeartbeatResponse response = stateStore.subClusterHeartbeat(request);
+ Assert.assertNotNull(response);
+ }
+
+ private void expiredSubcluster(String pSubClusterId) {
+ SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
+ stateStore.setExpiredHeartbeat(subClusterId, EXPIRATION_TIME);
+ }
+
+ private void checkSubClusterState(String pSubClusterId, SubClusterState expectState)
+ throws YarnException {
+ Map subClustersMap = facade.getSubClusters(false);
+ SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
+ SubClusterInfo subClusterInfo = subClustersMap.get(subClusterId);
+ if (subClusterInfo == null) {
+ throw new YarnException("subClusterId=" + pSubClusterId + " does not exist.");
+ }
+ Assert.assertEquals(expectState, subClusterInfo.getState());
+ }
+}