diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 0e74cf6763..b2de68b506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -989,6 +989,26 @@ public Collection getActiveSubClusters() } } + /** + * Get ApplicationSubmissionContext according to ApplicationId. + * We don't throw exceptions. If the application cannot be found, we return null. + * + * @param appId ApplicationId + * @return ApplicationSubmissionContext of ApplicationId + */ + public ApplicationSubmissionContext getApplicationSubmissionContext(ApplicationId appId) { + try { + GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest.newInstance(appId)); + ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster(); + return appHomeSubCluster.getApplicationSubmissionContext(); + } catch (Exception e) { + LOG.error("getApplicationSubmissionContext error, applicationId = {}.", appId, e); + return null; + } + } + + @VisibleForTesting public FederationCache getFederationCache() { return federationCache; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 78ad48d7f2..aa7a84b0c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -125,6 +125,7 @@ protected void serviceStop() throws Exception { * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * recovery. * @param rmName name of the YarnRM + * @param originalAppSubmissionContext ApplicationSubmissionContext * @see ApplicationSubmissionContext * #setKeepContainersAcrossApplicationAttempts(boolean) * @return uamId for the UAM @@ -134,7 +135,8 @@ protected void serviceStop() throws Exception { public String createAndRegisterNewUAM( RegisterApplicationMasterRequest registerRequest, Configuration conf, String queueName, String submitter, String appNameSuffix, - boolean keepContainersAcrossApplicationAttempts, String rmName) + boolean keepContainersAcrossApplicationAttempts, String rmName, + ApplicationSubmissionContext originalAppSubmissionContext) throws YarnException, IOException { ApplicationId appId = null; ApplicationClientProtocol rmClient; @@ -158,7 +160,8 @@ public String createAndRegisterNewUAM( // Launch the UAM in RM launchUAM(appId.toString(), conf, appId, queueName, submitter, - appNameSuffix, keepContainersAcrossApplicationAttempts, rmName); + appNameSuffix, keepContainersAcrossApplicationAttempts, rmName, + originalAppSubmissionContext); // Register the UAM application registerApplicationMaster(appId.toString(), registerRequest); @@ -179,6 +182,7 @@ public String createAndRegisterNewUAM( * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * recovery. * @param rmName name of the YarnRM + * @param originalAppSubmissionContext AppSubmissionContext * @see ApplicationSubmissionContext * #setKeepContainersAcrossApplicationAttempts(boolean) * @return UAM token @@ -188,14 +192,15 @@ public String createAndRegisterNewUAM( public Token launchUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, - String rmName) throws YarnException, IOException { + String rmName, ApplicationSubmissionContext originalAppSubmissionContext) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, - rmName); + rmName, originalAppSubmissionContext); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); @@ -226,19 +231,21 @@ public Token launchUAM(String uamId, Configuration conf, * @param appNameSuffix application name suffix for the UAM * @param uamToken UAM token * @param rmName name of the YarnRM + * @param originalAppSubmissionContext AppSubmissionContext * @throws YarnException if fails * @throws IOException if fails */ public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, - Token uamToken, String rmName) + Token uamToken, String rmName, + ApplicationSubmissionContext originalAppSubmissionContext) throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, - submitter, appNameSuffix, true, rmName); + submitter, appNameSuffix, true, rmName, originalAppSubmissionContext); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); @@ -266,15 +273,17 @@ public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId, * @param appNameSuffix application name suffix * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * @param rmName name of the YarnRM + * @param originalAppSubmissionContext ApplicationSubmissionContext * @return the UAM instance */ @VisibleForTesting protected UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, - String rmName) { + String rmName, ApplicationSubmissionContext originalAppSubmissionContext) { return new UnmanagedApplicationManager(conf, appId, queueName, submitter, - appNameSuffix, keepContainersAcrossApplicationAttempts, rmName); + appNameSuffix, keepContainersAcrossApplicationAttempts, rmName, + originalAppSubmissionContext); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 01f9bc7dbe..3121e6d44d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -99,6 +99,7 @@ public class UnmanagedApplicationManager { private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; private boolean keepContainersAcrossApplicationAttempts; + private ApplicationSubmissionContext applicationSubmissionContext; /* * This flag is used as an indication that this method launchUAM/reAttachUAM @@ -117,13 +118,15 @@ public class UnmanagedApplicationManager { * @param submitter user name of the app * @param appNameSuffix the app name suffix to use * @param rmName name of the YarnRM + * @param originalApplicationSubmissionContext ApplicationSubmissionContext * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * recovery. See {@link ApplicationSubmissionContext * #setKeepContainersAcrossApplicationAttempts(boolean)} */ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, - boolean keepContainersAcrossApplicationAttempts, String rmName) { + boolean keepContainersAcrossApplicationAttempts, String rmName, + ApplicationSubmissionContext originalApplicationSubmissionContext) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); Preconditions.checkNotNull(submitter, "App submitter cannot be null"); @@ -150,6 +153,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); this.keepContainersAcrossApplicationAttempts = keepContainersAcrossApplicationAttempts; + this.applicationSubmissionContext = originalApplicationSubmissionContext; } @VisibleForTesting @@ -429,6 +433,18 @@ private void submitUnmanagedApp(ApplicationId appId) Resource resource = Resources.createResource(1024); context.setResource(resource); context.setAMContainerSpec(amContainer); + if (applicationSubmissionContext != null) { + context.setApplicationType(applicationSubmissionContext.getApplicationType()); + context.setKeepContainersAcrossApplicationAttempts( + applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts()); + context.setApplicationTags(applicationSubmissionContext.getApplicationTags()); + context.setApplicationTimeouts(applicationSubmissionContext.getApplicationTimeouts()); + context.setLogAggregationContext(applicationSubmissionContext.getLogAggregationContext()); + context.setNodeLabelExpression(applicationSubmissionContext.getNodeLabelExpression()); + context.setApplicationSchedulingPropertiesMap( + applicationSubmissionContext.getApplicationSchedulingPropertiesMap()); + context.setPriority(applicationSubmissionContext.getPriority()); + } submitRequest.setApplicationSubmissionContext(context); context.setUnmanagedAM(true); @@ -551,4 +567,9 @@ protected void drainHeartbeatThread() { protected boolean isHeartbeatThreadAlive() { return this.heartbeatHandler.isAlive(); } + + @VisibleForTesting + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return applicationSubmissionContext; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index ddc85bf3e6..65266bf411 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -20,9 +20,12 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -36,8 +39,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -78,7 +84,7 @@ public void setup() { uam = new TestableUnmanagedApplicationManager(conf, attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, - "rm"); + "rm", null); threadpool = Executors.newCachedThreadPool(); uamPool = new TestableUnmanagedAMPoolManager(this.threadpool); @@ -434,8 +440,16 @@ public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, String rmName) { + this(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts, rmName, null); + } + + public TestableUnmanagedApplicationManager(Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, + String rmName, ApplicationSubmissionContext originalApplicationSubmissionContext) { super(conf, appId, queueName, submitter, appNameSuffix, - keepContainersAcrossApplicationAttempts, rmName); + keepContainersAcrossApplicationAttempts, rmName, originalApplicationSubmissionContext); } @Override @@ -505,9 +519,11 @@ public TestableUnmanagedAMPoolManager(ExecutorService threadpool) { @Override public UnmanagedApplicationManager createUAM(Configuration configuration, ApplicationId appId, String queueName, String submitter, String appNameSuffix, - boolean keepContainersAcrossApplicationAttempts, String rmId) { + boolean keepContainersAcrossApplicationAttempts, String rmId, + ApplicationSubmissionContext originalAppSubmissionContext) { return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter, - appNameSuffix, keepContainersAcrossApplicationAttempts, rmId); + appNameSuffix, keepContainersAcrossApplicationAttempts, rmId, + originalAppSubmissionContext); } } @@ -516,13 +532,13 @@ public void testSeparateThreadWithoutBlockServiceStop() throws Exception { ApplicationAttemptId attemptId1 = ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 1), 1); Token token1 = uamPool.launchUAM("SC-1", this.conf, - attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1"); + attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1", null); Assert.assertNotNull(token1); ApplicationAttemptId attemptId2 = ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 2), 1); Token token2 = uamPool.launchUAM("SC-2", this.conf, - attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2"); + attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2", null); Assert.assertNotNull(token2); Map unmanagedAppMasterMap = @@ -542,4 +558,40 @@ public void testSeparateThreadWithoutBlockServiceStop() throws Exception { 100, 2000); Assert.assertEquals(0, unmanagedAppMasterMap.size()); } + + @Test + public void testApplicationAttributes() + throws IOException, YarnException, InterruptedException, TimeoutException { + long now = Time.now(); + ApplicationId applicationId = ApplicationId.newInstance(now, 10); + ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext.newInstance( + applicationId, "test", "default", Priority.newInstance(10), null, true, true, 2, + Resource.newInstance(10, 2), "test"); + Set tags = Collections.singleton("1"); + appSubmissionContext.setApplicationTags(tags); + + Token token1 = uamPool.launchUAM("SC-1", this.conf, + applicationId, "default", "test-user", "SC-HOME", true, "SC-1", appSubmissionContext); + Assert.assertNotNull(token1); + + Map unmanagedAppMasterMap = + uamPool.getUnmanagedAppMasterMap(); + + UnmanagedApplicationManager uamApplicationManager = unmanagedAppMasterMap.get("SC-1"); + Assert.assertNotNull(uamApplicationManager); + + ApplicationSubmissionContext appSubmissionContextByUam = + uamApplicationManager.getApplicationSubmissionContext(); + + Assert.assertNotNull(appSubmissionContext); + Assert.assertEquals(10, appSubmissionContextByUam.getPriority().getPriority()); + Assert.assertEquals("test", appSubmissionContextByUam.getApplicationType()); + Assert.assertEquals(1, appSubmissionContextByUam.getApplicationTags().size()); + + uamPool.stop(); + Thread finishApplicationThread = uamPool.getFinishApplicationThread(); + GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(), + 100, 2000); + Assert.assertEquals(0, unmanagedAppMasterMap.size()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 0319867cb1..9460e70877 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; @@ -411,9 +412,12 @@ public void recover(Map recoveredDataMap) { FederationProxyProviderUtil.updateConfForFederation(config, keyScId); try { + ApplicationSubmissionContext originalSubmissionContext = + federationFacade.getApplicationSubmissionContext(applicationId); + // ReAttachUAM this.uamPool.reAttachUAM(keyScId, config, applicationId, queue, user, homeSCId, - tokens, keyScId); + tokens, keyScId, originalSubmissionContext); // GetAMRMClientRelayer this.secondaryRelayers.put(keyScId, this.uamPool.getAMRMClientRelayer(keyScId)); @@ -1026,10 +1030,13 @@ public RegisterApplicationMasterResponse call() throws Exception { FederationProxyProviderUtil.updateConfForFederation(config, subClusterId.getId()); + ApplicationSubmissionContext originalSubmissionContext = + federationFacade.getApplicationSubmissionContext(appId); + uamPool.reAttachUAM(subClusterId.getId(), config, appId, amRegistrationResponse.getQueue(), getApplicationContext().getUser(), homeSubClusterId.getId(), - amrmToken, subClusterId.toString()); + amrmToken, subClusterId.toString(), originalSubmissionContext); secondaryRelayers.put(subClusterId.getId(), uamPool.getAMRMClientRelayer(subClusterId.getId())); @@ -1266,11 +1273,15 @@ public void run() { RegisterApplicationMasterResponse uamResponse = null; Token token = null; try { + ApplicationId applicationId = attemptId.getApplicationId(); + ApplicationSubmissionContext originalSubmissionContext = + federationFacade.getApplicationSubmissionContext(applicationId); + // For appNameSuffix, use subClusterId of the home sub-cluster token = uamPool.launchUAM(subClusterId, config, - attemptId.getApplicationId(), amRegistrationResponse.getQueue(), + applicationId, amRegistrationResponse.getQueue(), getApplicationContext().getUser(), homeSubClusterId.toString(), - true, subClusterId); + true, subClusterId, originalSubmissionContext); secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 5ec47f1b52..a155874428 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -214,10 +215,10 @@ public TestableUnmanagedAMPoolManager(ExecutorService threadpool) { public UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, - String rmId) { + String rmId, ApplicationSubmissionContext originalAppSubmissionContext) { return new TestableUnmanagedApplicationManager(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, - rmId); + rmId, originalAppSubmissionContext); } } @@ -231,9 +232,9 @@ protected class TestableUnmanagedApplicationManager public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, - String rmName) { + String rmName, ApplicationSubmissionContext originalAppSubmissionContext) { super(conf, appId, queueName, submitter, appNameSuffix, - keepContainersAcrossApplicationAttempts, rmName); + keepContainersAcrossApplicationAttempts, rmName, originalAppSubmissionContext); } @Override