diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d3c4a1d528..fe278f3453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -108,6 +108,8 @@ public class AMRMProxyService extends CompositeService implements private Map applPipelineMap; private RegistryOperations registry; private AMRMProxyMetrics metrics; + private FederationStateStoreFacade federationFacade; + private boolean federationEnabled = false; /** * Creates an instance of the service. @@ -144,7 +146,10 @@ protected void serviceInit(Configuration conf) throws Exception { RegistryOperations.class); addService(this.registry); } - + this.federationFacade = FederationStateStoreFacade.getInstance(); + this.federationEnabled = + conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_ENABLED); super.serviceInit(conf); } @@ -389,13 +394,22 @@ public void processApplicationStartRequest(StartContainerRequest request) throws IOException, YarnException { long startTime = clock.getTime(); try { - LOG.info("Callback received for initializing request " - + "processing pipeline for an AM"); ContainerTokenIdentifier containerTokenIdentifierForKey = BuilderUtils.newContainerTokenIdentifier(request.getContainerToken()); ApplicationAttemptId appAttemptId = containerTokenIdentifierForKey.getContainerID() .getApplicationAttemptId(); + ApplicationId applicationID = appAttemptId.getApplicationId(); + // Checking if application is there in federation state store only + // if federation is enabled. If + // application is submitted to router then it adds it in statestore. + // if application is not found in statestore that means its + // submitted to RM + if (!checkIfAppExistsInStateStore(applicationID)) { + return; + } + LOG.info("Callback received for initializing request " + + "processing pipeline for an AM"); Credentials credentials = YarnServerSecurityUtils .parseCredentials(request.getContainerLaunchContext()); @@ -772,6 +786,21 @@ private RequestInterceptorChainWrapper getInterceptorChain( } } + boolean checkIfAppExistsInStateStore(ApplicationId applicationID) { + if (!federationEnabled) { + return true; + } + + try { + // Check if app is there in state store. If app is not there then it + // throws Exception + this.federationFacade.getApplicationHomeSubCluster(applicationID); + } catch (YarnException ex) { + return false; + } + return true; + } + @SuppressWarnings("unchecked") private Token getFirstAMRMToken( Collection> allTokens) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index b269fa4567..60e383870f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -662,6 +662,27 @@ public void testAppRecoveryFailure() throws YarnException, Exception { Assert.assertEquals(0, state.getAppContexts().size()); } + @Test + public void testCheckIfAppExistsInStateStore() + throws IOException, YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + Configuration conf = createConfiguration(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + + createAndStartAMRMProxyService(conf); + + Assert.assertEquals(false, + getAMRMProxyService().checkIfAppExistsInStateStore(appId)); + + Configuration distConf = createConfiguration(); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + + createAndStartAMRMProxyService(distConf); + + Assert.assertEquals(true, + getAMRMProxyService().checkIfAppExistsInStateStore(appId)); + } + /** * A mock intercepter implementation that uses the same mockRM instance across * restart.