From 8b88e9f8f4fa6a98fba71f1fb5bd8a674cc8a400 Mon Sep 17 00:00:00 2001
From: slfan1989 <55643692+slfan1989@users.noreply.github.com>
Date: Wed, 12 Jul 2023 09:47:07 +0800
Subject: [PATCH] YARN-11509. The FederationInterceptor#launchUAM Added retry
logic. (#5727)
---
.../hadoop/yarn/conf/YarnConfiguration.java | 14 ++
.../src/main/resources/yarn-default.xml | 18 ++
.../amrmproxy/FederationInterceptor.java | 174 +++++++++++-------
.../amrmproxy/TokenAndRegisterResponse.java | 45 +++++
.../amrmproxy/TestFederationInterceptor.java | 65 +++++--
.../TestableFederationInterceptor.java | 19 ++
6 files changed, 255 insertions(+), 80 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1e1fbb5993..648fddbbbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4058,6 +4058,20 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
60000; // one minute
+ // AMRMProxy Register UAM Retry-Num
+ public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+ FEDERATION_PREFIX + "amrmproxy.register.uam.retry-count";
+ // Register a UAM , we will retry a maximum of 3 times.
+ public static final int DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+ 3;
+
+ // AMRMProxy Register UAM Retry Interval
+ public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+ FEDERATION_PREFIX + "amrmproxy.register.uam.interval";
+ // Retry Interval, default 100 ms
+ public static final long DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+ TimeUnit.MILLISECONDS.toMillis(100);
+
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ "policy-manager";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 132f08f6b0..f722af852f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5408,4 +5408,22 @@
+
+
+ The number of retry for Register UAM.
+ The default value is 3.
+
+ yarn.federation.amrmproxy.register.uam.retry-count
+ 3
+
+
+
+
+ Interval between retry for Register UAM.
+ The default value is 100ms.
+
+ yarn.federation.amrmproxy.register.uam.interval
+ 100ms
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 14a2d60c2b..32c5bf217e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -36,6 +36,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -87,6 +88,7 @@
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
@@ -251,6 +253,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs;
+ private int registerUamRetryNum;
+
+ private long registerUamRetryInterval;
+
private boolean waitUamRegisterDone;
private MonotonicClock clock = new MonotonicClock();
@@ -355,6 +361,24 @@ public void init(AMRMProxyApplicationContext appContext) {
this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
}
+
+ this.registerUamRetryNum = conf.getInt(
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+ if (this.registerUamRetryNum <= 0) {
+ LOG.info("{} configured to be {}, should be positive. Using default of {}.",
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+ this.subClusterTimeOut,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+ this.registerUamRetryNum =
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT;
+ }
+
+ this.registerUamRetryInterval = conf.getTimeDuration(
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+
this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
}
@@ -701,7 +725,7 @@ public AllocateResponse allocate(AllocateRequest request)
if (this.finishAMCalled) {
LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
- + "processing and return dummy response" + this.attemptId);
+ + "processing and return dummy response.", this.attemptId);
return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
}
@@ -1255,85 +1279,77 @@ private List registerAndAllocateWithNewSubClusters(
// Check to see if there are any new sub-clusters in this request
// list and create and register Unmanaged AM instance for the new ones
List newSubClusters = new ArrayList<>();
- for (SubClusterId subClusterId : requests.keySet()) {
- if (!subClusterId.equals(this.homeSubClusterId)
- && !this.uamPool.hasUAMId(subClusterId.getId())) {
- newSubClusters.add(subClusterId);
+ requests.keySet().stream().forEach(subClusterId -> {
+ String id = subClusterId.getId();
+ if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) {
+ newSubClusters.add(subClusterId);
// Set sub-cluster to be timed out initially
- lastSCResponseTime.put(subClusterId,
- clock.getTime() - subClusterTimeOut);
+ lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
}
- }
+ });
this.uamRegisterFutures.clear();
+
for (final SubClusterId scId : newSubClusters) {
- Future> future = this.threadpool.submit(new Runnable() {
- @Override
- public void run() {
- String subClusterId = scId.getId();
- // Create a config loaded with federation on and subclusterId
- // for each UAM
- YarnConfiguration config = new YarnConfiguration(getConf());
- FederationProxyProviderUtil.updateConfForFederation(config,
- subClusterId);
+ Future> future = this.threadpool.submit(() -> {
- RegisterApplicationMasterResponse uamResponse = null;
- Token token = null;
- try {
- ApplicationId applicationId = attemptId.getApplicationId();
- ApplicationSubmissionContext originalSubmissionContext =
- federationFacade.getApplicationSubmissionContext(applicationId);
+ String subClusterId = scId.getId();
- // For appNameSuffix, use subClusterId of the home sub-cluster
- token = uamPool.launchUAM(subClusterId, config,
- applicationId, amRegistrationResponse.getQueue(),
- getApplicationContext().getUser(), homeSubClusterId.toString(),
- true, subClusterId, originalSubmissionContext);
+ // Create a config loaded with federation on and subclusterId
+ // for each UAM
+ YarnConfiguration config = new YarnConfiguration(getConf());
+ FederationProxyProviderUtil.updateConfForFederation(config, subClusterId);
+ ApplicationId applicationId = attemptId.getApplicationId();
- secondaryRelayers.put(subClusterId,
- uamPool.getAMRMClientRelayer(subClusterId));
+ RegisterApplicationMasterResponse uamResponse;
+ Token token;
- uamResponse = uamPool.registerApplicationMaster(subClusterId,
- amRegistrationRequest);
- } catch (Throwable e) {
- LOG.error("Failed to register application master: " + subClusterId
- + " Application: " + attemptId, e);
- // TODO: UAM registration for this sub-cluster RM
- // failed. For now, we ignore the resource requests and continue
- // but we need to fix this and handle this situation. One way would
- // be to send the request to another RM by consulting the policy.
- return;
- }
- uamRegistrations.put(scId, uamResponse);
- LOG.info("Successfully registered unmanaged application master: "
- + subClusterId + " ApplicationId: " + attemptId);
-
- try {
- uamPool.allocateAsync(subClusterId, requests.get(scId),
- new HeartbeatCallBack(scId, true));
- } catch (Throwable e) {
- LOG.error("Failed to allocate async to " + subClusterId
- + " Application: " + attemptId, e);
- }
-
- // Save the UAM token in registry or NMSS
- try {
- if (registryClient != null) {
- registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
- subClusterId, token);
- } else if (getNMStateStore() != null) {
- getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
- NMSS_SECONDARY_SC_PREFIX + subClusterId,
- token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
- }
- } catch (Throwable e) {
- LOG.error("Failed to persist UAM token from " + subClusterId
- + " Application: " + attemptId, e);
+ // LaunchUAM And RegisterApplicationMaster
+ try {
+ TokenAndRegisterResponse result =
+ ((FederationActionRetry) (retryCount) ->
+ launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId)).
+ runWithRetries(registerUamRetryNum, registerUamRetryInterval);
+
+ token = result.getToken();
+ uamResponse = result.getResponse();
+ } catch (Throwable e) {
+ LOG.error("Failed to register application master: {} Application: {}.",
+ subClusterId, attemptId, e);
+ return;
+ }
+
+ uamRegistrations.put(scId, uamResponse);
+
+ LOG.info("Successfully registered unmanaged application master: {} " +
+ "ApplicationId: {}.", subClusterId, attemptId);
+
+ // Allocate Request
+ try {
+ uamPool.allocateAsync(subClusterId, requests.get(scId),
+ new HeartbeatCallBack(scId, true));
+ } catch (Throwable e) {
+ LOG.error("Failed to allocate async to {} Application: {}.",
+ subClusterId, attemptId, e);
+ }
+
+ // Save the UAM token in registry or NMSS
+ try {
+ if (registryClient != null) {
+ registryClient.writeAMRMTokenForUAM(applicationId, subClusterId, token);
+ } else if (getNMStateStore() != null) {
+ getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+ NMSS_SECONDARY_SC_PREFIX + subClusterId,
+ token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
}
+ } catch (Throwable e) {
+ LOG.error("Failed to persist UAM token from {} Application {}",
+ subClusterId, attemptId, e);
}
});
+
this.uamRegisterFutures.put(scId, future);
}
@@ -1347,10 +1363,34 @@ public void run() {
}
}
-
return newSubClusters;
}
+ protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(
+ YarnConfiguration config, String subClusterId, ApplicationId applicationId)
+ throws IOException, YarnException {
+
+ // Prepare parameter information
+ ApplicationSubmissionContext originalSubmissionContext =
+ federationFacade.getApplicationSubmissionContext(applicationId);
+ String submitter = getApplicationContext().getUser();
+ String homeRM = homeSubClusterId.toString();
+ String queue = amRegistrationResponse.getQueue();
+
+ // For appNameSuffix, use subClusterId of the home sub-cluster
+ Token token = uamPool.launchUAM(subClusterId, config, applicationId,
+ queue, submitter, homeRM, true, subClusterId, originalSubmissionContext);
+
+ // Set the relationship between SubCluster and AMRMClientRelayer.
+ secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId));
+
+ // RegisterApplicationMaster
+ RegisterApplicationMasterResponse uamResponse =
+ uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest);
+
+ return new TokenAndRegisterResponse(token, uamResponse);
+ }
+
/**
* Prepare the base allocation response. Use lastSCResponse and
* lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
new file mode 100644
index 0000000000..d67ecab9a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * This class contains information about the AMRM token and the RegisterApplicationMasterResponse.
+ */
+public class TokenAndRegisterResponse {
+ private Token token;
+ private RegisterApplicationMasterResponse response;
+
+ public TokenAndRegisterResponse(Token pToken,
+ RegisterApplicationMasterResponse pResponse) {
+ this.token = pToken;
+ this.response = pResponse;
+ }
+
+ public Token getToken() {
+ return token;
+ }
+
+ public RegisterApplicationMasterResponse getResponse() {
+ return response;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 9e87543723..9e3e73f7f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -179,9 +178,8 @@ protected YarnConfiguration createConfiguration() {
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500);
- // Wait UAM Register Down
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);
-
+ // Register UAM Retry Interval 1ms
+ conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL, 1);
return conf;
}
@@ -597,10 +595,6 @@ public Object run() throws Exception {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
- // Waiting for SC-1 to time out.
- GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
-
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
@@ -859,7 +853,7 @@ public Object run() throws Exception {
List containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
- LOG.info("Allocated container {}", c.getId());
+ LOG.info("Allocated container " + c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
@@ -893,10 +887,6 @@ public Object run() throws Exception {
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
- // Waiting for SC-1 to time out.
- GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
-
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
@@ -1444,4 +1434,53 @@ private void finishApplication() throws IOException, YarnException {
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
}
+
+ @Test
+ public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception {
+
+ UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+ interceptor.setRetryCount(2);
+
+ ugi.doAs((PrivilegedExceptionAction