diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 34257eda55..ead99779fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2096,6 +2096,9 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
"org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+ "DefaultRequestInterceptor";
+ public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX
+ + "amrmproxy.ha.enable";
+ public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false;
/**
* Default platform-agnostic CLASSPATH for YARN applications. A
@@ -2930,6 +2933,11 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
+ public static final String FEDERATION_REGISTRY_BASE_KEY =
+ FEDERATION_PREFIX + "registry.base-dir";
+ public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
+ "yarnfederation/";
+
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
@@ -3087,6 +3095,11 @@ public class YarnConfiguration extends Configuration {
// Other Configs
////////////////////////////////
+ public static final String YARN_REGISTRY_CLASS =
+ YARN_PREFIX + "registry.class";
+ public static final String DEFAULT_YARN_REGISTRY_CLASS =
+ "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService";
+
/**
* Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
* The interval of the yarn client's querying application state after
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e90d0f253f..12cb90273f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2826,7 +2826,20 @@
300
+
+ The registry base directory for federation.
+ yarn.federation.registry.base-dir
+ yarnfederation/
+
+
+
+
+ The registry implementation to use.
+ yarn.registry.class
+ org.apache.hadoop.registry.client.impl.FSRegistryOperationsService
+
+
The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol.
@@ -2987,6 +3000,14 @@
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor
+
+
+ Whether AMRMProxy HA is enabled.
+
+ yarn.nodemanager.amrmproxy.ha.enable
+ false
+
+
Setting that controls whether distributed scheduling is enabled.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 43ae3af403..cd5195dfa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -66,6 +66,11 @@
test
+
+ org.apache.hadoop
+ hadoop-yarn-registry
+
+
com.google.guava
guava
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
new file mode 100644
index 0000000000..662431836a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Helper class that handles reads and writes to Yarn Registry to support UAM HA
+ * and second attempt.
+ */
+public class FederationRegistryClient {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationRegistryClient.class);
+
+ private RegistryOperations registry;
+
+ private UserGroupInformation user;
+
+ // AppId -> SubClusterId -> UAM token
+ private Map>>
+ appSubClusterTokenMap;
+
+ // Structure in registry: // -> UAMToken
+ private String registryBaseDir;
+
+ public FederationRegistryClient(Configuration conf,
+ RegistryOperations registry, UserGroupInformation user) {
+ this.registry = registry;
+ this.user = user;
+ this.appSubClusterTokenMap = new ConcurrentHashMap<>();
+ this.registryBaseDir =
+ conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY,
+ YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY);
+ LOG.info("Using registry {} with base directory: {}",
+ this.registry.getClass().getName(), this.registryBaseDir);
+ }
+
+ /**
+ * Get the list of known applications in the registry.
+ *
+ * @return the list of known applications
+ */
+ public List getAllApplications() {
+ // Suppress the exception here because it is valid that the entry does not
+ // exist
+ List applications = null;
+ try {
+ applications = listDirRegistry(this.registry, this.user,
+ getRegistryKey(null, null), false);
+ } catch (YarnException e) {
+ LOG.warn("Unexpected exception from listDirRegistry", e);
+ }
+ if (applications == null) {
+ // It is valid for listDirRegistry to return null
+ return new ArrayList<>();
+ }
+ return applications;
+ }
+
+ /**
+ * For testing, delete all application records in registry.
+ */
+ @VisibleForTesting
+ public void cleanAllApplications() {
+ try {
+ removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
+ true, false);
+ } catch (YarnException e) {
+ LOG.warn("Unexpected exception from removeKeyRegistry", e);
+ }
+ }
+
+ /**
+ * Write/update the UAM token for an application and a sub-cluster.
+ *
+ * @param subClusterId sub-cluster id of the token
+ * @param token the UAM of the application
+ * @return whether the amrmToken is added or updated to a new value
+ */
+ public boolean writeAMRMTokenForUAM(ApplicationId appId,
+ String subClusterId, Token token) {
+ Map> subClusterTokenMap =
+ this.appSubClusterTokenMap.get(appId);
+ if (subClusterTokenMap == null) {
+ subClusterTokenMap = new ConcurrentHashMap<>();
+ this.appSubClusterTokenMap.put(appId, subClusterTokenMap);
+ }
+
+ boolean update = !token.equals(subClusterTokenMap.get(subClusterId));
+ if (!update) {
+ LOG.debug("Same amrmToken received from {}, skip writing registry for {}",
+ subClusterId, appId);
+ return update;
+ }
+
+ LOG.info("Writing/Updating amrmToken for {} to registry for {}",
+ subClusterId, appId);
+ try {
+ // First, write the token entry
+ writeRegistry(this.registry, this.user,
+ getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true);
+
+ // Then update the subClusterTokenMap
+ subClusterTokenMap.put(subClusterId, token);
+ } catch (YarnException | IOException e) {
+ LOG.error(
+ "Failed writing AMRMToken to registry for subcluster " + subClusterId,
+ e);
+ }
+ return update;
+ }
+
+ /**
+ * Load the information of one application from registry.
+ *
+ * @param appId application id
+ * @return the sub-cluster to UAM token mapping
+ */
+ public Map>
+ loadStateFromRegistry(ApplicationId appId) {
+ Map> retMap = new HashMap<>();
+ // Suppress the exception here because it is valid that the entry does not
+ // exist
+ List subclusters = null;
+ try {
+ subclusters = listDirRegistry(this.registry, this.user,
+ getRegistryKey(appId, null), false);
+ } catch (YarnException e) {
+ LOG.warn("Unexpected exception from listDirRegistry", e);
+ }
+
+ if (subclusters == null) {
+ LOG.info("Application {} does not exist in registry", appId);
+ return retMap;
+ }
+
+ // Read the amrmToken for each sub-cluster with an existing UAM
+ for (String scId : subclusters) {
+ LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId);
+ String key = getRegistryKey(appId, scId);
+ try {
+ String tokenString = readRegistry(this.registry, this.user, key, true);
+ if (tokenString == null) {
+ throw new YarnException("Null string from readRegistry key " + key);
+ }
+ Token amrmToken = new Token<>();
+ amrmToken.decodeFromUrlString(tokenString);
+ // Clear the service field, as if RM just issued the token
+ amrmToken.setService(new Text());
+
+ retMap.put(scId, amrmToken);
+ } catch (Exception e) {
+ LOG.error("Failed reading registry key " + key
+ + ", skipping subcluster " + scId, e);
+ }
+ }
+
+ // Override existing map if there
+ this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap));
+ return retMap;
+ }
+
+ /**
+ * Remove an application from registry.
+ *
+ * @param appId application id
+ */
+ public void removeAppFromRegistry(ApplicationId appId) {
+ Map> subClusterTokenMap =
+ this.appSubClusterTokenMap.get(appId);
+ LOG.info("Removing all registry entries for {}", appId);
+
+ if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
+ return;
+ }
+
+ // Lastly remove the application directory
+ String key = getRegistryKey(appId, null);
+ try {
+ removeKeyRegistry(this.registry, this.user, key, true, true);
+ subClusterTokenMap.clear();
+ } catch (YarnException e) {
+ LOG.error("Failed removing registry directory key " + key, e);
+ }
+ }
+
+ private String getRegistryKey(ApplicationId appId, String fileName) {
+ if (appId == null) {
+ return this.registryBaseDir;
+ }
+ if (fileName == null) {
+ return this.registryBaseDir + appId.toString();
+ }
+ return this.registryBaseDir + appId.toString() + "/" + fileName;
+ }
+
+ private String readRegistry(final RegistryOperations registryImpl,
+ UserGroupInformation ugi, final String key, final boolean throwIfFails)
+ throws YarnException {
+ // Use the ugi loaded with app credentials to access registry
+ String result = ugi.doAs(new PrivilegedAction() {
+ @Override
+ public String run() {
+ try {
+ ServiceRecord value = registryImpl.resolve(key);
+ if (value != null) {
+ return value.description;
+ }
+ } catch (Throwable e) {
+ if (throwIfFails) {
+ LOG.error("Registry resolve key " + key + " failed", e);
+ }
+ }
+ return null;
+ }
+ });
+ if (result == null && throwIfFails) {
+ throw new YarnException("Registry resolve key " + key + " failed");
+ }
+ return result;
+ }
+
+ private void removeKeyRegistry(final RegistryOperations registryImpl,
+ UserGroupInformation ugi, final String key, final boolean recursive,
+ final boolean throwIfFails) throws YarnException {
+ // Use the ugi loaded with app credentials to access registry
+ boolean success = ugi.doAs(new PrivilegedAction() {
+ @Override
+ public Boolean run() {
+ try {
+ registryImpl.delete(key, recursive);
+ return true;
+ } catch (Throwable e) {
+ if (throwIfFails) {
+ LOG.error("Registry remove key " + key + " failed", e);
+ }
+ }
+ return false;
+ }
+ });
+ if (!success && throwIfFails) {
+ throw new YarnException("Registry remove key " + key + " failed");
+ }
+ }
+
+ /**
+ * Write registry entry, override if exists.
+ */
+ private void writeRegistry(final RegistryOperations registryImpl,
+ UserGroupInformation ugi, final String key, final String value,
+ final boolean throwIfFails) throws YarnException {
+
+ final ServiceRecord recordValue = new ServiceRecord();
+ recordValue.description = value;
+ // Use the ugi loaded with app credentials to access registry
+ boolean success = ugi.doAs(new PrivilegedAction() {
+ @Override
+ public Boolean run() {
+ try {
+ registryImpl.bind(key, recordValue, BindFlags.OVERWRITE);
+ return true;
+ } catch (Throwable e) {
+ if (throwIfFails) {
+ LOG.error("Registry write key " + key + " failed", e);
+ }
+ }
+ return false;
+ }
+ });
+ if (!success && throwIfFails) {
+ throw new YarnException("Registry write key " + key + " failed");
+ }
+ }
+
+ /**
+ * List the sub directories in the given directory.
+ */
+ private List listDirRegistry(final RegistryOperations registryImpl,
+ UserGroupInformation ugi, final String key, final boolean throwIfFails)
+ throws YarnException {
+ List result = ugi.doAs(new PrivilegedAction>() {
+ @Override
+ public List run() {
+ try {
+ return registryImpl.list(key);
+ } catch (Throwable e) {
+ if (throwIfFails) {
+ LOG.error("Registry list key " + key + " failed", e);
+ }
+ }
+ return null;
+ }
+ });
+ if (result == null && throwIfFails) {
+ throw new YarnException("Registry list key " + key + " failed");
+ }
+ return result;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 08aee77fe6..677c4e65fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -44,9 +45,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
@@ -67,7 +69,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
// Map from uamId to UAM instances
private Map unmanagedAppMasterMap;
- private Map attemptIdMap;
+ private Map appIdMap;
private ExecutorService threadpool;
@@ -82,7 +84,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
this.threadpool = Executors.newCachedThreadPool();
}
this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
- this.attemptIdMap = new ConcurrentHashMap<>();
+ this.appIdMap = new ConcurrentHashMap<>();
super.serviceStart();
}
@@ -114,7 +116,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
public KillApplicationResponse call() throws Exception {
try {
LOG.info("Force-killing UAM id " + uamId + " for application "
- + attemptIdMap.get(uamId));
+ + appIdMap.get(uamId));
return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
@@ -132,7 +134,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
LOG.error("Failed to kill unmanaged application master", e);
}
}
- this.attemptIdMap.clear();
+ this.appIdMap.clear();
super.serviceStop();
}
@@ -145,13 +147,18 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
+ * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+ * recovery.
+ * @see ApplicationSubmissionContext
+ * #setKeepContainersAcrossApplicationAttempts(boolean)
* @return uamId for the UAM
* @throws YarnException if registerApplicationMaster fails
* @throws IOException if registerApplicationMaster fails
*/
public String createAndRegisterNewUAM(
RegisterApplicationMasterRequest registerRequest, Configuration conf,
- String queueName, String submitter, String appNameSuffix)
+ String queueName, String submitter, String appNameSuffix,
+ boolean keepContainersAcrossApplicationAttempts)
throws YarnException, IOException {
ApplicationId appId = null;
ApplicationClientProtocol rmClient;
@@ -173,45 +180,52 @@ public class UnmanagedAMPoolManager extends AbstractService {
rmClient = null;
}
- createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId,
- queueName, submitter, appNameSuffix);
+ // Launch the UAM in RM
+ launchUAM(appId.toString(), conf, appId, queueName, submitter,
+ appNameSuffix, keepContainersAcrossApplicationAttempts);
+
+ // Register the UAM application
+ registerApplicationMaster(appId.toString(), registerRequest);
+
+ // Returns the appId as uamId
return appId.toString();
}
/**
- * Create a new UAM and register the application, using the provided uamId and
- * appId.
+ * Launch a new UAM, using the provided uamId and appId.
*
- * @param uamId identifier for the UAM
- * @param registerRequest RegisterApplicationMasterRequest
+ * @param uamId uam Id
* @param conf configuration for this UAM
* @param appId application id for the UAM
* @param queueName queue of the application
* @param submitter submitter name of the UAM
* @param appNameSuffix application name suffix for the UAM
- * @return RegisterApplicationMasterResponse
- * @throws YarnException if registerApplicationMaster fails
- * @throws IOException if registerApplicationMaster fails
+ * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+ * recovery.
+ * @see ApplicationSubmissionContext
+ * #setKeepContainersAcrossApplicationAttempts(boolean)
+ * @return UAM token
+ * @throws YarnException if fails
+ * @throws IOException if fails
*/
- public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
- RegisterApplicationMasterRequest registerRequest, Configuration conf,
+ public Token launchUAM(String uamId, Configuration conf,
ApplicationId appId, String queueName, String submitter,
- String appNameSuffix) throws YarnException, IOException {
+ String appNameSuffix, boolean keepContainersAcrossApplicationAttempts)
+ throws YarnException, IOException {
if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists");
}
- UnmanagedApplicationManager uam =
- createUAM(conf, appId, queueName, submitter, appNameSuffix);
+ UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
+ submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
// Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam);
- RegisterApplicationMasterResponse response = null;
+ Token amrmToken = null;
try {
- LOG.info("Creating and registering UAM id {} for application {}", uamId,
- appId);
- response = uam.createAndRegisterApplicationMaster(registerRequest);
+ LOG.info("Launching UAM id {} for application {}", uamId, appId);
+ amrmToken = uam.launchUAM();
} catch (Exception e) {
// Add the map earlier and remove here if register failed because we want
// to make sure there is only one uam instance per uamId at any given time
@@ -219,8 +233,48 @@ public class UnmanagedAMPoolManager extends AbstractService {
throw e;
}
- this.attemptIdMap.put(uamId, uam.getAttemptId());
- return response;
+ this.appIdMap.put(uamId, uam.getAppId());
+ return amrmToken;
+ }
+
+ /**
+ * Re-attach to an existing UAM, using the provided uamIdentifier.
+ *
+ * @param uamId uam Id
+ * @param conf configuration for this UAM
+ * @param appId application id for the UAM
+ * @param queueName queue of the application
+ * @param submitter submitter name of the UAM
+ * @param appNameSuffix application name suffix for the UAM
+ * @param uamToken UAM token
+ * @throws YarnException if fails
+ * @throws IOException if fails
+ */
+ public void reAttachUAM(String uamId, Configuration conf,
+ ApplicationId appId, String queueName, String submitter,
+ String appNameSuffix, Token uamToken)
+ throws YarnException, IOException {
+
+ if (this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " already exists");
+ }
+ UnmanagedApplicationManager uam =
+ createUAM(conf, appId, queueName, submitter, appNameSuffix, true);
+ // Put the UAM into map first before initializing it to avoid additional UAM
+ // for the same uamId being created concurrently
+ this.unmanagedAppMasterMap.put(uamId, uam);
+
+ try {
+ LOG.info("Reattaching UAM id {} for application {}", uamId, appId);
+ uam.reAttachUAM(uamToken);
+ } catch (Exception e) {
+ // Add the map earlier and remove here if register failed because we want
+ // to make sure there is only one uam instance per uamId at any given time
+ this.unmanagedAppMasterMap.remove(uamId);
+ throw e;
+ }
+
+ this.appIdMap.put(uamId, uam.getAppId());
}
/**
@@ -231,20 +285,42 @@ public class UnmanagedAMPoolManager extends AbstractService {
* @param queueName queue of the application
* @param submitter submitter name of the application
* @param appNameSuffix application name suffix
+ * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
* @return the UAM instance
*/
@VisibleForTesting
protected UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
- String appNameSuffix) {
+ String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
- appNameSuffix);
+ appNameSuffix, keepContainersAcrossApplicationAttempts);
+ }
+
+ /**
+ * Register application master for the UAM.
+ *
+ * @param uamId uam Id
+ * @param registerRequest RegisterApplicationMasterRequest
+ * @return register response
+ * @throws YarnException if register fails
+ * @throws IOException if register fails
+ */
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String uamId, RegisterApplicationMasterRequest registerRequest)
+ throws YarnException, IOException {
+ if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " does not exist");
+ }
+ LOG.info("Registering UAM id {} for application {}", uamId,
+ this.appIdMap.get(uamId));
+ return this.unmanagedAppMasterMap.get(uamId)
+ .registerApplicationMaster(registerRequest);
}
/**
* AllocateAsync to an UAM.
*
- * @param uamId identifier for the UAM
+ * @param uamId uam Id
* @param request AllocateRequest
* @param callback callback for response
* @throws YarnException if allocate fails
@@ -262,7 +338,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
/**
* Finish an UAM/application.
*
- * @param uamId identifier for the UAM
+ * @param uamId uam Id
* @param request FinishApplicationMasterRequest
* @return FinishApplicationMasterResponse
* @throws YarnException if finishApplicationMaster call fails
@@ -274,14 +350,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
- LOG.info("Finishing application for UAM id {} ", uamId);
+ LOG.info("Finishing UAM id {} for application {}", uamId,
+ this.appIdMap.get(uamId));
FinishApplicationMasterResponse response =
this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
if (response.getIsUnregistered()) {
// Only remove the UAM when the unregister finished
this.unmanagedAppMasterMap.remove(uamId);
- this.attemptIdMap.remove(uamId);
+ this.appIdMap.remove(uamId);
LOG.info("UAM id {} is unregistered", uamId);
}
return response;
@@ -301,7 +378,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
/**
* Return whether an UAM exists.
*
- * @param uamId identifier for the UAM
+ * @param uamId uam Id
* @return UAM exists or not
*/
public boolean hasUAMId(String uamId) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 6531a75c95..3f4a1100b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -90,7 +92,6 @@ public class UnmanagedApplicationManager {
private AMRequestHandlerThread handlerThread;
private ApplicationMasterProtocol rmProxy;
private ApplicationId applicationId;
- private ApplicationAttemptId attemptId;
private String submitter;
private String appNameSuffix;
private Configuration conf;
@@ -101,9 +102,31 @@ public class UnmanagedApplicationManager {
private ApplicationClientProtocol rmClient;
private long asyncApiPollIntervalMillis;
private RecordFactory recordFactory;
+ private boolean keepContainersAcrossApplicationAttempts;
+ /*
+ * This flag is used as an indication that this method launchUAM/reAttachUAM
+ * is called (and perhaps blocked in initializeUnmanagedAM below due to RM
+ * connection/failover issue and not finished yet). Set the flag before
+ * calling the blocking call to RM.
+ */
+ private boolean connectionInitiated;
+
+ /**
+ * Constructor.
+ *
+ * @param conf configuration
+ * @param appId application Id to use for this UAM
+ * @param queueName the queue of the UAM
+ * @param submitter user name of the app
+ * @param appNameSuffix the app name suffix to use
+ * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+ * recovery. See {@link ApplicationSubmissionContext
+ * #setKeepContainersAcrossApplicationAttempts(boolean)}
+ */
public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
- String queueName, String submitter, String appNameSuffix) {
+ String queueName, String submitter, String appNameSuffix,
+ boolean keepContainersAcrossApplicationAttempts) {
Preconditions.checkNotNull(conf, "Configuration cannot be null");
Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
Preconditions.checkNotNull(submitter, "App submitter cannot be null");
@@ -116,6 +139,7 @@ public class UnmanagedApplicationManager {
this.handlerThread = new AMRequestHandlerThread();
this.requestQueue = new LinkedBlockingQueue<>();
this.rmProxy = null;
+ this.connectionInitiated = false;
this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
this.asyncApiPollIntervalMillis = conf.getLong(
@@ -123,45 +147,84 @@ public class UnmanagedApplicationManager {
YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YarnConfiguration.
DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+ this.keepContainersAcrossApplicationAttempts =
+ keepContainersAcrossApplicationAttempts;
+ }
+
+ /**
+ * Launch a new UAM in the resource manager.
+ *
+ * @return identifier uam identifier
+ * @throws YarnException if fails
+ * @throws IOException if fails
+ */
+ public Token launchUAM()
+ throws YarnException, IOException {
+ this.connectionInitiated = true;
+
+ // Blocking call to RM
+ Token amrmToken =
+ initializeUnmanagedAM(this.applicationId);
+
+ // Creates the UAM connection
+ createUAMProxy(amrmToken);
+ return amrmToken;
+ }
+
+ /**
+ * Re-attach to an existing UAM in the resource manager.
+ *
+ * @param amrmToken the UAM token
+ * @throws IOException if re-attach fails
+ * @throws YarnException if re-attach fails
+ */
+ public void reAttachUAM(Token amrmToken)
+ throws IOException, YarnException {
+ this.connectionInitiated = true;
+
+ // Creates the UAM connection
+ createUAMProxy(amrmToken);
+ }
+
+ protected void createUAMProxy(Token amrmToken)
+ throws IOException {
+ this.userUgi = UserGroupInformation.createProxyUser(
+ this.applicationId.toString(), UserGroupInformation.getCurrentUser());
+ this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
+ this.userUgi, amrmToken);
}
/**
* Registers this {@link UnmanagedApplicationManager} with the resource
* manager.
*
- * @param request the register request
- * @return the register response
+ * @param request RegisterApplicationMasterRequest
+ * @return register response
* @throws YarnException if register fails
* @throws IOException if register fails
*/
- public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
+ public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
- // This need to be done first in this method, because it is used as an
- // indication that this method is called (and perhaps blocked due to RM
- // connection and not finished yet)
+ // Save the register request for re-register later
this.registerRequest = request;
- // attemptId will be available after this call
- UnmanagedAMIdentifier identifier =
- initializeUnmanagedAM(this.applicationId);
-
- try {
- this.userUgi = UserGroupInformation.createProxyUser(
- identifier.getAttemptId().toString(),
- UserGroupInformation.getCurrentUser());
- } catch (IOException e) {
- LOG.error("Exception while trying to get current user", e);
- throw new YarnRuntimeException(e);
- }
-
- this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
- this.userUgi, identifier.getToken());
-
- LOG.info("Registering the Unmanaged application master {}", this.attemptId);
+ // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
+ // We do not expect application already registered exception here
+ LOG.info("Registering the Unmanaged application master {}",
+ this.applicationId);
RegisterApplicationMasterResponse response =
this.rmProxy.registerApplicationMaster(this.registerRequest);
+ for (Container container : response.getContainersFromPreviousAttempts()) {
+ LOG.info("RegisterUAM returned existing running container "
+ + container.getId());
+ }
+ for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
+ LOG.info("RegisterUAM returned existing NM token for node "
+ + nmToken.getNodeId());
+ }
+
// Only when register succeed that we start the heartbeat thread
this.handlerThread.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
@@ -187,11 +250,11 @@ public class UnmanagedApplicationManager {
this.handlerThread.shutdown();
if (this.rmProxy == null) {
- if (this.registerRequest != null) {
- // This is possible if the async registerApplicationMaster is still
+ if (this.connectionInitiated) {
+ // This is possible if the async launchUAM is still
// blocked and retrying. Return a dummy response in this case.
LOG.warn("Unmanaged AM still not successfully launched/registered yet."
- + " Stopping the UAM client thread anyways.");
+ + " Stopping the UAM heartbeat thread anyways.");
return FinishApplicationMasterResponse.newInstance(false);
} else {
throw new YarnException("finishApplicationMaster should not "
@@ -199,7 +262,7 @@ public class UnmanagedApplicationManager {
}
}
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
- this.registerRequest, this.attemptId);
+ this.registerRequest, this.applicationId);
}
/**
@@ -212,7 +275,7 @@ public class UnmanagedApplicationManager {
public KillApplicationResponse forceKillApplication()
throws IOException, YarnException {
KillApplicationRequest request =
- KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
+ KillApplicationRequest.newInstance(this.applicationId);
this.handlerThread.shutdown();
@@ -240,29 +303,29 @@ public class UnmanagedApplicationManager {
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
// Two possible cases why the UAM is not successfully registered yet:
- // 1. registerApplicationMaster is not called at all. Should throw here.
- // 2. registerApplicationMaster is called but hasn't successfully returned.
+ // 1. launchUAM is not called at all. Should throw here.
+ // 2. launchUAM is called but hasn't successfully returned.
//
// In case 2, we have already save the allocate request above, so if the
// registration succeed later, no request is lost.
if (this.rmProxy == null) {
- if (this.registerRequest != null) {
+ if (this.connectionInitiated) {
LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later.");
} else {
throw new YarnException(
- "AllocateAsync should not be called before createAndRegister");
+ "AllocateAsync should not be called before launchUAM");
}
}
}
/**
- * Returns the application attempt id of the UAM.
+ * Returns the application id of the UAM.
*
- * @return attempt id of the UAM
+ * @return application id of the UAM
*/
- public ApplicationAttemptId getAttemptId() {
- return this.attemptId;
+ public ApplicationId getAppId() {
+ return this.applicationId;
}
/**
@@ -287,15 +350,15 @@ public class UnmanagedApplicationManager {
* Launch and initialize an unmanaged AM. First, it creates a new application
* on the RM and negotiates a new attempt id. Then it waits for the RM
* application attempt state to reach YarnApplicationAttemptState.LAUNCHED
- * after which it returns the AM-RM token and the attemptId.
+ * after which it returns the AM-RM token.
*
* @param appId application id
- * @return the UAM identifier
+ * @return the UAM token
* @throws IOException if initialize fails
* @throws YarnException if initialize fails
*/
- protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
- throws IOException, YarnException {
+ protected Token initializeUnmanagedAM(
+ ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
@@ -306,13 +369,12 @@ public class UnmanagedApplicationManager {
submitUnmanagedApp(appId);
// Monitor the application attempt to wait for launch state
- ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
+ monitorCurrentAppAttempt(appId,
EnumSet.of(YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
YarnApplicationAttemptState.LAUNCHED);
- this.attemptId = attemptReport.getApplicationAttemptId();
- return getUAMIdentifier();
+ return getUAMToken();
} finally {
this.rmClient = null;
}
@@ -343,6 +405,8 @@ public class UnmanagedApplicationManager {
submitRequest.setApplicationSubmissionContext(context);
context.setUnmanagedAM(true);
+ context.setKeepContainersAcrossApplicationAttempts(
+ this.keepContainersAcrossApplicationAttempts);
LOG.info("Submitting unmanaged application {}", appId);
this.rmClient.submitApplication(submitRequest);
@@ -374,8 +438,10 @@ public class UnmanagedApplicationManager {
if (appStates.contains(state)) {
if (state != YarnApplicationState.ACCEPTED) {
throw new YarnRuntimeException(
- "Received non-accepted application state: " + state
- + ". Application " + appId + " not the first attempt?");
+ "Received non-accepted application state: " + state + " for "
+ + appId + ". This is likely because this is not the first "
+ + "app attempt in home sub-cluster, and AMRMProxy HA "
+ + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled.");
}
appAttemptId =
getApplicationReport(appId).getCurrentApplicationAttemptId();
@@ -415,25 +481,25 @@ public class UnmanagedApplicationManager {
}
/**
- * Gets the identifier of the unmanaged AM.
+ * Gets the amrmToken of the unmanaged AM.
*
- * @return the identifier of the unmanaged AM.
+ * @return the amrmToken of the unmanaged AM.
* @throws IOException if getApplicationReport fails
* @throws YarnException if getApplicationReport fails
*/
- protected UnmanagedAMIdentifier getUAMIdentifier()
+ protected Token getUAMToken()
throws IOException, YarnException {
Token token = null;
org.apache.hadoop.yarn.api.records.Token amrmToken =
- getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
+ getApplicationReport(this.applicationId).getAMRMToken();
if (amrmToken != null) {
token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
} else {
LOG.warn(
"AMRMToken not found in the application report for application: {}",
- this.attemptId.getApplicationId());
+ this.applicationId);
}
- return new UnmanagedAMIdentifier(this.attemptId, token);
+ return token;
}
private ApplicationReport getApplicationReport(ApplicationId appId)
@@ -444,29 +510,6 @@ public class UnmanagedApplicationManager {
return this.rmClient.getApplicationReport(request).getApplicationReport();
}
- /**
- * Data structure that encapsulates the application attempt identifier and the
- * AMRMTokenIdentifier. Make it public because clients with HA need it.
- */
- public static class UnmanagedAMIdentifier {
- private ApplicationAttemptId attemptId;
- private Token token;
-
- public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
- Token token) {
- this.attemptId = attemptId;
- this.token = token;
- }
-
- public ApplicationAttemptId getAttemptId() {
- return this.attemptId;
- }
-
- public Token getToken() {
- return this.token;
- }
- }
-
/**
* Data structure that encapsulates AllocateRequest and AsyncCallback
* instance.
@@ -549,8 +592,10 @@ public class UnmanagedApplicationManager {
}
request.setResponseId(lastResponseId);
+
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
- request, rmProxy, registerRequest, attemptId);
+ request, rmProxy, registerRequest, applicationId);
+
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
@@ -578,18 +623,17 @@ public class UnmanagedApplicationManager {
LOG.debug("Interrupted while waiting for queue", ex);
}
} catch (IOException ex) {
- LOG.warn(
- "IO Error occurred while processing heart beat for " + attemptId,
- ex);
+ LOG.warn("IO Error occurred while processing heart beat for "
+ + applicationId, ex);
} catch (Throwable ex) {
LOG.warn(
- "Error occurred while processing heart beat for " + attemptId,
+ "Error occurred while processing heart beat for " + applicationId,
ex);
}
}
LOG.info("UnmanagedApplicationManager has been stopped for {}. "
- + "AMRequestHandlerThread thread is exiting", attemptId);
+ + "AMRequestHandlerThread thread is exiting", applicationId);
}
}
@@ -600,8 +644,8 @@ public class UnmanagedApplicationManager {
implements UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Heartbeat thread {} for application attempt {} crashed!",
- t.getName(), attemptId, e);
+ LOG.error("Heartbeat thread {} for application {} crashed!",
+ t.getName(), applicationId, e);
}
}
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
index 7993bd8a5e..3cecdca552 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -63,16 +63,16 @@ public final class AMRMClientUtils {
/**
* Handle ApplicationNotRegistered exception and re-register.
*
- * @param attemptId app attemptId
+ * @param appId application Id
* @param rmProxy RM proxy instance
* @param registerRequest the AM re-register request
* @throws YarnException if re-register fails
*/
public static void handleNotRegisteredExceptionAndReRegister(
- ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
+ ApplicationId appId, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest) throws YarnException {
LOG.info("App attempt {} not registered, most likely due to RM failover. "
- + " Trying to re-register.", attemptId);
+ + " Trying to re-register.", appId);
try {
rmProxy.registerApplicationMaster(registerRequest);
} catch (Exception e) {
@@ -93,25 +93,24 @@ public final class AMRMClientUtils {
* @param request allocate request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
- * @param attemptId application attempt id
+ * @param appId application id
* @return allocate response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest,
- ApplicationAttemptId attemptId) throws YarnException, IOException {
+ RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+ throws YarnException, IOException {
try {
return rmProxy.allocate(request);
} catch (ApplicationMasterNotRegisteredException e) {
- handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+ handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// reset responseId after re-register
request.setResponseId(0);
// retry allocate
- return allocateWithReRegister(request, rmProxy, registerRequest,
- attemptId);
+ return allocateWithReRegister(request, rmProxy, registerRequest, appId);
}
}
@@ -123,23 +122,22 @@ public final class AMRMClientUtils {
* @param request finishApplicationMaster request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
- * @param attemptId application attempt id
+ * @param appId application id
* @return finishApplicationMaster response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static FinishApplicationMasterResponse finishAMWithReRegister(
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
- RegisterApplicationMasterRequest registerRequest,
- ApplicationAttemptId attemptId) throws YarnException, IOException {
+ RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+ throws YarnException, IOException {
try {
return rmProxy.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException ex) {
- handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+ handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// retry finishAM after re-register
- return finishAMWithReRegister(request, rmProxy, registerRequest,
- attemptId);
+ return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 628c7819dc..b5727aa0fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -177,10 +178,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
LoggerFactory.getLogger(MockResourceManagerFacade.class);
private HashSet applicationMap = new HashSet<>();
- private HashMap> applicationContainerIdMap =
- new HashMap>();
- private HashMap allocatedContainerMap =
- new HashMap();
+ private HashSet keepContainerOnUams = new HashSet<>();
+ private HashMap>
+ applicationContainerIdMap = new HashMap<>();
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
private int subClusterId;
@@ -221,7 +221,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
this.isRunning = mode;
}
- private static String getAppIdentifier() throws IOException {
+ private static ApplicationAttemptId getAppIdentifier() throws IOException {
AMRMTokenIdentifier result = null;
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
Set tokenIds = remoteUgi.getTokenIdentifiers();
@@ -231,7 +231,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
break;
}
}
- return result != null ? result.getApplicationAttemptId().toString() : "";
+ return result != null ? result.getApplicationAttemptId()
+ : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0);
}
private void validateRunning() throws ConnectException {
@@ -246,19 +247,32 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throws YarnException, IOException {
validateRunning();
-
- String amrmToken = getAppIdentifier();
- LOG.info("Registering application attempt: " + amrmToken);
+ ApplicationAttemptId attemptId = getAppIdentifier();
+ LOG.info("Registering application attempt: " + attemptId);
shouldReRegisterNext = false;
+ List containersFromPreviousAttempt = null;
+
synchronized (applicationContainerIdMap) {
- if (applicationContainerIdMap.containsKey(amrmToken)) {
- throw new InvalidApplicationMasterRequestException(
- AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ if (applicationContainerIdMap.containsKey(attemptId)) {
+ if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
+ // For UAM with the keepContainersFromPreviousAttempt flag, return all
+ // running containers
+ containersFromPreviousAttempt = new ArrayList<>();
+ for (ContainerId containerId : applicationContainerIdMap
+ .get(attemptId)) {
+ containersFromPreviousAttempt.add(Container.newInstance(containerId,
+ null, null, null, null, null));
+ }
+ } else {
+ throw new InvalidApplicationMasterRequestException(
+ AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ }
+ } else {
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(attemptId, new ArrayList());
}
- // Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(amrmToken, new ArrayList());
}
// Make sure we wait for certain test cases last in the method
@@ -278,7 +292,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
- null, request.getHost(), null);
+ containersFromPreviousAttempt, request.getHost(), null);
}
@Override
@@ -288,8 +302,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
- String amrmToken = getAppIdentifier();
- LOG.info("Finishing application attempt: " + amrmToken);
+ ApplicationAttemptId attemptId = getAppIdentifier();
+ LOG.info("Finishing application attempt: " + attemptId);
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@@ -299,12 +313,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
- Assert.assertTrue("The application id is NOT registered: " + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List ids = applicationContainerIdMap.remove(amrmToken);
- for (ContainerId c : ids) {
- allocatedContainerMap.remove(c);
- }
+ Assert.assertTrue("The application id is NOT registered: " + attemptId,
+ applicationContainerIdMap.containsKey(attemptId));
+ applicationContainerIdMap.remove(attemptId);
}
return FinishApplicationMasterResponse.newInstance(
@@ -334,8 +345,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ "askList and releaseList in the same heartbeat");
}
- String amrmToken = getAppIdentifier();
- LOG.info("Allocate from application attempt: " + amrmToken);
+ ApplicationAttemptId attemptId = getAppIdentifier();
+ LOG.info("Allocate from application attempt: " + attemptId);
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@@ -367,16 +378,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// will need it in future
Assert.assertTrue(
"The application id is Not registered before allocate(): "
- + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List ids = applicationContainerIdMap.get(amrmToken);
+ + attemptId,
+ applicationContainerIdMap.containsKey(attemptId));
+ List ids = applicationContainerIdMap.get(attemptId);
ids.add(containerId);
- this.allocatedContainerMap.put(containerId, container);
}
}
}
}
+ List completedList = new ArrayList<>();
if (request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
LOG.info("Releasing containers: " + request.getReleaseList().size());
@@ -384,9 +395,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
Assert
.assertTrue(
"The application id is not registered before allocate(): "
- + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List ids = applicationContainerIdMap.get(amrmToken);
+ + attemptId,
+ applicationContainerIdMap.containsKey(attemptId));
+ List ids = applicationContainerIdMap.get(attemptId);
for (ContainerId id : request.getReleaseList()) {
boolean found = false;
@@ -402,18 +413,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ conf.get("AMRMTOKEN"), found);
ids.remove(id);
-
- // Return the released container back to the AM with new fake Ids. The
- // test case does not care about the IDs. The IDs are faked because
- // otherwise the LRM will throw duplication identifier exception. This
- // returning of fake containers is ONLY done for testing purpose - for
- // the test code to get confirmation that the sub-cluster resource
- // managers received the release request
- ContainerId fakeContainerId = ContainerId.newInstance(
- getApplicationAttemptId(1), containerIndex.incrementAndGet());
- Container fakeContainer = allocatedContainerMap.get(id);
- fakeContainer.setId(fakeContainerId);
- containerList.add(fakeContainer);
+ completedList.add(
+ ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0));
}
}
}
@@ -424,9 +425,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
- return AllocateResponse.newInstance(0, new ArrayList(),
- containerList, new ArrayList(), null, AMCommand.AM_RESYNC,
- 1, null, new ArrayList(), newAMRMToken,
+ return AllocateResponse.newInstance(0, completedList, containerList,
+ new ArrayList(), null, AMCommand.AM_RESYNC, 1, null,
+ new ArrayList(), newAMRMToken,
new ArrayList());
}
@@ -443,6 +444,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
report.setApplicationId(request.getApplicationId());
report.setCurrentApplicationAttemptId(
ApplicationAttemptId.newInstance(request.getApplicationId(), 1));
+ report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], ""));
response.setApplicationReport(report);
return response;
}
@@ -486,6 +488,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
LOG.info("Application submitted: " + appId);
applicationMap.add(appId);
+
+ if (request.getApplicationSubmissionContext().getUnmanagedAM()
+ || request.getApplicationSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()) {
+ keepContainerOnUams.add(appId);
+ }
return SubmitApplicationResponse.newInstance();
}
@@ -502,6 +510,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
+ keepContainerOnUams.remove(appId);
}
LOG.info("Force killing application: " + appId);
return KillApplicationResponse.newInstance(true);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
new file mode 100644
index 0000000000..42be851512
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for FederationRegistryClient.
+ */
+public class TestFederationRegistryClient {
+ private Configuration conf;
+ private UserGroupInformation user;
+ private RegistryOperations registry;
+ private FederationRegistryClient registryClient;
+
+ @Before
+ public void setup() throws Exception {
+ this.conf = new YarnConfiguration();
+
+ this.registry = new FSRegistryOperationsService();
+ this.registry.init(this.conf);
+ this.registry.start();
+
+ this.user = UserGroupInformation.getCurrentUser();
+ this.registryClient =
+ new FederationRegistryClient(this.conf, this.registry, this.user);
+ this.registryClient.cleanAllApplications();
+ Assert.assertEquals(0, this.registryClient.getAllApplications().size());
+ }
+
+ @After
+ public void breakDown() {
+ registryClient.cleanAllApplications();
+ Assert.assertEquals(0, registryClient.getAllApplications().size());
+ registry.stop();
+ }
+
+ @Test
+ public void testBasicCase() {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ String scId1 = "subcluster1";
+ String scId2 = "subcluster2";
+
+ this.registryClient.writeAMRMTokenForUAM(appId, scId1,
+ new Token());
+ this.registryClient.writeAMRMTokenForUAM(appId, scId2,
+ new Token());
+ // Duplicate entry, should overwrite
+ this.registryClient.writeAMRMTokenForUAM(appId, scId1,
+ new Token());
+
+ Assert.assertEquals(1, this.registryClient.getAllApplications().size());
+ Assert.assertEquals(2,
+ this.registryClient.loadStateFromRegistry(appId).size());
+
+ this.registryClient.removeAppFromRegistry(appId);
+
+ Assert.assertEquals(0, this.registryClient.getAllApplications().size());
+ Assert.assertEquals(0,
+ this.registryClient.loadStateFromRegistry(appId).size());
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index 9159cf7515..5848d3f8b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -65,7 +65,7 @@ public class TestUnmanagedApplicationManager {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
uam = new TestableUnmanagedApplicationManager(conf,
- attemptId.getApplicationId(), null, "submitter", "appNameSuffix");
+ attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
}
protected void waitForCallBackCountAndCheckZeroPending(
@@ -88,7 +88,8 @@ public class TestUnmanagedApplicationManager {
public void testBasicUsage()
throws YarnException, IOException, InterruptedException {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
@@ -102,11 +103,48 @@ public class TestUnmanagedApplicationManager {
attemptId);
}
+ /*
+ * Test re-attaching of an existing UAM. This is for HA of UAM client.
+ */
+ @Test(timeout = 5000)
+ public void testUAMReAttach()
+ throws YarnException, IOException, InterruptedException {
+
+ launchUAM(attemptId);
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+ // Wait for outstanding async allocate callback
+ waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+ MockResourceManagerFacade rmProxy = uam.getRMProxy();
+ uam = new TestableUnmanagedApplicationManager(conf,
+ attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
+ uam.setRMProxy(rmProxy);
+
+ reAttachUAM(null, attemptId);
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+ allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+ attemptId);
+
+ // Wait for outstanding async allocate callback
+ waitForCallBackCountAndCheckZeroPending(callback, 2);
+
+ finishApplicationMaster(
+ FinishApplicationMasterRequest.newInstance(null, null, null),
+ attemptId);
+ }
+
@Test(timeout = 5000)
public void testReRegister()
throws YarnException, IOException, InterruptedException {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.setShouldReRegisterNext();
@@ -137,7 +175,8 @@ public class TestUnmanagedApplicationManager {
@Override
public void run() {
try {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null),
attemptId);
} catch (Exception e) {
@@ -221,7 +260,8 @@ public class TestUnmanagedApplicationManager {
@Test
public void testForceKill()
throws YarnException, IOException, InterruptedException {
- createAndRegisterApplicationMaster(
+ launchUAM(attemptId);
+ registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.forceKillApplication();
@@ -241,19 +281,40 @@ public class TestUnmanagedApplicationManager {
return ugi;
}
- protected RegisterApplicationMasterResponse
- createAndRegisterApplicationMaster(
- final RegisterApplicationMasterRequest request,
- ApplicationAttemptId appAttemptId)
- throws YarnException, IOException, InterruptedException {
+ protected Token launchUAM(
+ ApplicationAttemptId appAttemptId)
+ throws IOException, InterruptedException {
+ return getUGIWithToken(appAttemptId)
+ .doAs(new PrivilegedExceptionAction>() {
+ @Override
+ public Token run() throws Exception {
+ return uam.launchUAM();
+ }
+ });
+ }
+
+ protected void reAttachUAM(final Token uamToken,
+ ApplicationAttemptId appAttemptId)
+ throws IOException, InterruptedException {
+ getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction