diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index d2d6453635..959edafcf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -34,6 +34,7 @@ import java.util.Comparator; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -252,9 +253,13 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster(); - + SubClusterId homeSubClusterId = homeSubCluster.getHomeSubCluster(); + ApplicationSubmissionContext appSubmissionContext = homeSubCluster.getApplicationSubmissionContext(); ApplicationId appId = homeSubCluster.getApplicationId(); + LOG.info("appId = {}, homeSubClusterId = {}, appSubmissionContext = {}.", + appId, homeSubClusterId, appSubmissionContext); + if (!applications.containsKey(appId)) { applications.put(appId, homeSubCluster); } @@ -292,8 +297,20 @@ public class MemoryFederationStateStore implements FederationStateStore { "Application %s does not exist.", appId); } - return GetApplicationHomeSubClusterResponse.newInstance(appId, - applications.get(appId).getHomeSubCluster()); + // Whether the returned result contains context + ApplicationHomeSubCluster appHomeSubCluster = applications.get(appId); + ApplicationSubmissionContext submissionContext = + appHomeSubCluster.getApplicationSubmissionContext(); + boolean containsAppSubmissionContext = request.getContainsAppSubmissionContext(); + long creatTime = appHomeSubCluster.getCreateTime(); + SubClusterId homeSubClusterId = appHomeSubCluster.getHomeSubCluster(); + + if (containsAppSubmissionContext && submissionContext != null) { + return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubClusterId, creatTime, + submissionContext); + } + + return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubClusterId, creatTime); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java index 4a3b3a7fe9..1f966a40b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java @@ -77,6 +77,17 @@ public abstract class ApplicationHomeSubCluster { return appMapping; } + @Private + @Unstable + public static ApplicationHomeSubCluster newInstance(ApplicationId appId, + SubClusterId homeSubCluster, ApplicationSubmissionContext appSubmissionContext) { + ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class); + appMapping.setApplicationId(appId); + appMapping.setHomeSubCluster(homeSubCluster); + appMapping.setApplicationSubmissionContext(appSubmissionContext); + return appMapping; + } + /** * Get the {@link ApplicationId} representing the unique identifier of the * application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java index 6144b01e86..5a224bd007 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.util.Records; /** @@ -52,6 +53,31 @@ public abstract class GetApplicationHomeSubClusterResponse { return mapResponse; } + @Private + @Unstable + public static GetApplicationHomeSubClusterResponse newInstance( + ApplicationId appId, SubClusterId homeSubCluster, long createTime) { + ApplicationHomeSubCluster applicationHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appId, createTime, homeSubCluster); + GetApplicationHomeSubClusterResponse mapResponse = + Records.newRecord(GetApplicationHomeSubClusterResponse.class); + mapResponse.setApplicationHomeSubCluster(applicationHomeSubCluster); + return mapResponse; + } + + @Private + @Unstable + public static GetApplicationHomeSubClusterResponse newInstance( + ApplicationId appId, SubClusterId homeSubCluster, long createTime, + ApplicationSubmissionContext context) { + ApplicationHomeSubCluster applicationHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appId, createTime, homeSubCluster, context); + GetApplicationHomeSubClusterResponse mapResponse = + Records.newRecord(GetApplicationHomeSubClusterResponse.class); + mapResponse.setApplicationHomeSubCluster(applicationHomeSubCluster); + return mapResponse; + } + /** * Get the {@link ApplicationHomeSubCluster} representing the mapping of the * application to it's home sub-cluster. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index b4be7ca62c..0e74cf6763 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -38,7 +38,9 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -837,13 +839,16 @@ public final class FederationStateStoreFacade { * @param applicationId applicationId, is the id of the application. * @param subClusterId homeSubClusterId, this is selected by strategy. * @param retryCount number of retries. + * @param appSubmissionContext appSubmissionContext. * @throws YarnException yarn exception. */ public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId, - SubClusterId subClusterId, int retryCount) throws YarnException { + SubClusterId subClusterId, int retryCount, ApplicationSubmissionContext appSubmissionContext) + throws YarnException { Boolean exists = existsApplicationHomeSubCluster(applicationId); ApplicationHomeSubCluster appHomeSubCluster = - ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); + ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), + subClusterId, appSubmissionContext); if (!exists || retryCount == 0) { // persist the mapping of applicationId and the subClusterId which has // been selected as its home. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index ad09c03aaa..299e7001c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -33,6 +33,7 @@ import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -656,6 +657,16 @@ public abstract class FederationStateStoreBaseTest { stateStore.addApplicationHomeSubCluster(request); } + void addApplicationHomeSC(ApplicationId appId, SubClusterId subClusterId, + ApplicationSubmissionContext submissionContext) throws YarnException { + long createTime = Time.now(); + ApplicationHomeSubCluster ahsc = ApplicationHomeSubCluster.newInstance( + appId, createTime, subClusterId, submissionContext); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(ahsc); + stateStore.addApplicationHomeSubCluster(request); + } + private void setPolicyConf(String queue, String policyType) throws YarnException { SetSubClusterPolicyConfigurationRequest request = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 5548dab1b8..10612bdff4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -27,6 +31,10 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; @@ -88,4 +96,30 @@ public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier); assertEquals(identifier, tokenIdentifier); } + + @Test + public void testGetApplicationHomeSubClusterWithContext() throws Exception { + MemoryFederationStateStore memoryStateStore = + MemoryFederationStateStore.class.cast(this.getStateStore()); + + ApplicationId appId = ApplicationId.newInstance(1, 3); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance(appId, "test", "default", + Priority.newInstance(0), null, true, true, + 2, Resource.newInstance(10, 2), "test"); + addApplicationHomeSC(appId, subClusterId, context); + + GetApplicationHomeSubClusterRequest getRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId, true); + GetApplicationHomeSubClusterResponse result = + memoryStateStore.getApplicationHomeSubCluster(getRequest); + + assertEquals(appId, + result.getApplicationHomeSubCluster().getApplicationId()); + assertEquals(subClusterId, + result.getApplicationHomeSubCluster().getHomeSubCluster()); + assertEquals(context, + result.getApplicationHomeSubCluster().getApplicationSubmissionContext()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index dcd7777779..dff135bdb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationReque import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -764,4 +766,39 @@ public final class RouterServerUtil { return b.toByteArray(); } } + + /** + * Get trimmed version of ApplicationSubmissionContext to be saved to + * Federation State Store. + * + * @param actualContext actual ApplicationSubmissionContext. + * @return trimmed ApplicationSubmissionContext. + */ + @Private + @Unstable + public static ApplicationSubmissionContext getTrimmedAppSubmissionContext( + ApplicationSubmissionContext actualContext) { + if (actualContext == null) { + return null; + } + + // Set Basic information + ApplicationSubmissionContext trimmedContext = + Records.newRecord(ApplicationSubmissionContext.class); + trimmedContext.setApplicationId(actualContext.getApplicationId()); + trimmedContext.setApplicationName(actualContext.getApplicationName()); + trimmedContext.setQueue(actualContext.getQueue()); + trimmedContext.setPriority(actualContext.getPriority()); + trimmedContext.setApplicationType(actualContext.getApplicationType()); + trimmedContext.setNodeLabelExpression(actualContext.getNodeLabelExpression()); + trimmedContext.setLogAggregationContext(actualContext.getLogAggregationContext()); + trimmedContext.setApplicationTags(actualContext.getApplicationTags()); + trimmedContext.setApplicationSchedulingPropertiesMap( + actualContext.getApplicationSchedulingPropertiesMap()); + trimmedContext.setKeepContainersAcrossApplicationAttempts( + actualContext.getKeepContainersAcrossApplicationAttempts()); + trimmedContext.setApplicationTimeouts(actualContext.getApplicationTimeouts()); + + return trimmedContext; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 6cb4ff44e9..337b8fd8e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -565,8 +565,10 @@ public class FederationClientInterceptor // Step2. We Store the mapping relationship // between Application and HomeSubCluster in stateStore. + ApplicationSubmissionContext trimmedAppSubmissionContext = + RouterServerUtil.getTrimmedAppSubmissionContext(appSubmissionContext); federationFacade.addOrUpdateApplicationHomeSubCluster( - applicationId, subClusterId, retryCount); + applicationId, subClusterId, retryCount, trimmedAppSubmissionContext); // Step3. SubmitApplication to the subCluster ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 5d73ef20e5..f11ca9bc89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -559,8 +559,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Step2. We Store the mapping relationship // between Application and HomeSubCluster in stateStore. + ApplicationSubmissionContext trimmedAppSubmissionContext = + RouterServerUtil.getTrimmedAppSubmissionContext(context); federationFacade.addOrUpdateApplicationHomeSubCluster( - applicationId, subClusterId, retryCount); + applicationId, subClusterId, retryCount, trimmedAppSubmissionContext); // Step3. We get subClusterInfo based on subClusterId. SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);