YARN-10229. [Federation] Client should be able to submit application to RM directly using normal client conf. Contributed by Bilwa S T.
This commit is contained in:
parent
c40cbc57fa
commit
eac558380f
@ -108,6 +108,8 @@ public class AMRMProxyService extends CompositeService implements
|
|||||||
private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
|
private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
|
||||||
private RegistryOperations registry;
|
private RegistryOperations registry;
|
||||||
private AMRMProxyMetrics metrics;
|
private AMRMProxyMetrics metrics;
|
||||||
|
private FederationStateStoreFacade federationFacade;
|
||||||
|
private boolean federationEnabled = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an instance of the service.
|
* Creates an instance of the service.
|
||||||
@ -144,7 +146,10 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
RegistryOperations.class);
|
RegistryOperations.class);
|
||||||
addService(this.registry);
|
addService(this.registry);
|
||||||
}
|
}
|
||||||
|
this.federationFacade = FederationStateStoreFacade.getInstance();
|
||||||
|
this.federationEnabled =
|
||||||
|
conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -389,13 +394,22 @@ public void processApplicationStartRequest(StartContainerRequest request)
|
|||||||
throws IOException, YarnException {
|
throws IOException, YarnException {
|
||||||
long startTime = clock.getTime();
|
long startTime = clock.getTime();
|
||||||
try {
|
try {
|
||||||
LOG.info("Callback received for initializing request "
|
|
||||||
+ "processing pipeline for an AM");
|
|
||||||
ContainerTokenIdentifier containerTokenIdentifierForKey =
|
ContainerTokenIdentifier containerTokenIdentifierForKey =
|
||||||
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
|
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
containerTokenIdentifierForKey.getContainerID()
|
containerTokenIdentifierForKey.getContainerID()
|
||||||
.getApplicationAttemptId();
|
.getApplicationAttemptId();
|
||||||
|
ApplicationId applicationID = appAttemptId.getApplicationId();
|
||||||
|
// Checking if application is there in federation state store only
|
||||||
|
// if federation is enabled. If
|
||||||
|
// application is submitted to router then it adds it in statestore.
|
||||||
|
// if application is not found in statestore that means its
|
||||||
|
// submitted to RM
|
||||||
|
if (!checkIfAppExistsInStateStore(applicationID)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Callback received for initializing request "
|
||||||
|
+ "processing pipeline for an AM");
|
||||||
Credentials credentials = YarnServerSecurityUtils
|
Credentials credentials = YarnServerSecurityUtils
|
||||||
.parseCredentials(request.getContainerLaunchContext());
|
.parseCredentials(request.getContainerLaunchContext());
|
||||||
|
|
||||||
@ -772,6 +786,21 @@ private RequestInterceptorChainWrapper getInterceptorChain(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean checkIfAppExistsInStateStore(ApplicationId applicationID) {
|
||||||
|
if (!federationEnabled) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Check if app is there in state store. If app is not there then it
|
||||||
|
// throws Exception
|
||||||
|
this.federationFacade.getApplicationHomeSubCluster(applicationID);
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Token<AMRMTokenIdentifier> getFirstAMRMToken(
|
private Token<AMRMTokenIdentifier> getFirstAMRMToken(
|
||||||
Collection<Token<? extends TokenIdentifier>> allTokens) {
|
Collection<Token<? extends TokenIdentifier>> allTokens) {
|
||||||
|
@ -662,6 +662,27 @@ public void testAppRecoveryFailure() throws YarnException, Exception {
|
|||||||
Assert.assertEquals(0, state.getAppContexts().size());
|
Assert.assertEquals(0, state.getAppContexts().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckIfAppExistsInStateStore()
|
||||||
|
throws IOException, YarnException {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||||
|
|
||||||
|
createAndStartAMRMProxyService(conf);
|
||||||
|
|
||||||
|
Assert.assertEquals(false,
|
||||||
|
getAMRMProxyService().checkIfAppExistsInStateStore(appId));
|
||||||
|
|
||||||
|
Configuration distConf = createConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
|
||||||
|
|
||||||
|
createAndStartAMRMProxyService(distConf);
|
||||||
|
|
||||||
|
Assert.assertEquals(true,
|
||||||
|
getAMRMProxyService().checkIfAppExistsInStateStore(appId));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mock intercepter implementation that uses the same mockRM instance across
|
* A mock intercepter implementation that uses the same mockRM instance across
|
||||||
* restart.
|
* restart.
|
||||||
|
Loading…
Reference in New Issue
Block a user