YARN-3666. Federation Intercepting and propagating AM- home RM communications. (Botong Huang via Subru).
(cherry picked from commit 2399eb8200609246cb623c74450ca4a2032063cc)
This commit is contained in:
parent
d6c29b2f5b
commit
bed1832c93
@ -594,4 +594,11 @@
|
||||
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
|
||||
</Match>
|
||||
|
||||
<!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" />
|
||||
<Method name="registerApplicationMaster" />
|
||||
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
|
||||
</Match>
|
||||
|
||||
</FindBugsFilter>
|
||||
|
@ -0,0 +1,510 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Extends the AbstractRequestInterceptor and provides an implementation for
|
||||
* federation of YARN RM and scaling an application across multiple YARN
|
||||
* sub-clusters. All the federation specific implementation is encapsulated in
|
||||
* this class. This is always the last intercepter in the chain.
|
||||
*/
|
||||
public class FederationInterceptor extends AbstractRequestInterceptor {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationInterceptor.class);
|
||||
|
||||
/**
|
||||
* The home sub-cluster is the sub-cluster where the AM container is running
|
||||
* in.
|
||||
*/
|
||||
private ApplicationMasterProtocol homeRM;
|
||||
private SubClusterId homeSubClusterId;
|
||||
|
||||
/**
|
||||
* Used to keep track of the container Id and the sub cluster RM that created
|
||||
* the container, so that we know which sub-cluster to forward later requests
|
||||
* about existing containers to.
|
||||
*/
|
||||
private Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap;
|
||||
|
||||
/**
|
||||
* The original registration request that was sent by the AM. This instance is
|
||||
* reused to register/re-register with all the sub-cluster RMs.
|
||||
*/
|
||||
private RegisterApplicationMasterRequest amRegistrationRequest;
|
||||
|
||||
/**
|
||||
* The original registration response from home RM. This instance is reused
|
||||
* for duplicate register request from AM, triggered by timeout between AM and
|
||||
* AMRMProxy.
|
||||
*/
|
||||
private RegisterApplicationMasterResponse amRegistrationResponse;
|
||||
|
||||
/** The proxy ugi used to talk to home RM. */
|
||||
private UserGroupInformation appOwner;
|
||||
|
||||
/**
|
||||
* Creates an instance of the FederationInterceptor class.
|
||||
*/
|
||||
public FederationInterceptor() {
|
||||
this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
|
||||
this.amRegistrationResponse = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the instance using specified context.
|
||||
*/
|
||||
@Override
|
||||
public void init(AMRMProxyApplicationContext appContext) {
|
||||
super.init(appContext);
|
||||
LOG.info("Initializing Federation Interceptor");
|
||||
|
||||
// Update the conf if available
|
||||
Configuration conf = appContext.getConf();
|
||||
if (conf == null) {
|
||||
conf = getConf();
|
||||
} else {
|
||||
setConf(conf);
|
||||
}
|
||||
|
||||
try {
|
||||
this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
|
||||
UserGroupInformation.getCurrentUser());
|
||||
} catch (Exception ex) {
|
||||
throw new YarnRuntimeException(ex);
|
||||
}
|
||||
|
||||
this.homeSubClusterId =
|
||||
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
||||
this.homeRM = createHomeRMProxy(appContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the application master's registration request to the home RM.
|
||||
*
|
||||
* Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
|
||||
* so that when AM registers more than once, it returns the same register
|
||||
* success response instead of throwing
|
||||
* {@link InvalidApplicationMasterRequestException}. Furthermore, we present
|
||||
* to AM as if we are the RM that never fails over. When actual RM fails over,
|
||||
* we always re-register automatically.
|
||||
*
|
||||
* We did this because FederationInterceptor can receive concurrent register
|
||||
* requests from AM because of timeout between AM and AMRMProxy, which is
|
||||
* shorter than the timeout + failOver between FederationInterceptor
|
||||
* (AMRMProxy) and RM.
|
||||
*/
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
// If AM is calling with a different request, complain
|
||||
if (this.amRegistrationRequest != null
|
||||
&& !this.amRegistrationRequest.equals(request)) {
|
||||
throw new YarnException("A different request body recieved. AM should"
|
||||
+ " not call registerApplicationMaster with different request body");
|
||||
}
|
||||
|
||||
// Save the registration request. This will be used for registering with
|
||||
// secondary sub-clusters using UAMs, as well as re-register later
|
||||
this.amRegistrationRequest = request;
|
||||
|
||||
/*
|
||||
* Present to AM as if we are the RM that never fails over. When actual RM
|
||||
* fails over, we always re-register automatically.
|
||||
*
|
||||
* We did this because it is possible for AM to send duplicate register
|
||||
* request because of timeout. When it happens, it is fine to simply return
|
||||
* the success message. Out of all outstanding register threads, only the
|
||||
* last one will still have an unbroken RPC connection and successfully
|
||||
* return the response.
|
||||
*/
|
||||
if (this.amRegistrationResponse != null) {
|
||||
return this.amRegistrationResponse;
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a registration request to the home resource manager. Note that here
|
||||
* we don't register with other sub-cluster resource managers because that
|
||||
* will prevent us from using new sub-clusters that get added while the AM
|
||||
* is running and will breaks the elasticity feature. The registration with
|
||||
* the other sub-cluster RM will be done lazily as needed later.
|
||||
*/
|
||||
try {
|
||||
this.amRegistrationResponse =
|
||||
this.homeRM.registerApplicationMaster(request);
|
||||
} catch (InvalidApplicationMasterRequestException e) {
|
||||
if (e.getMessage()
|
||||
.contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
|
||||
// Some other register thread might have succeeded in the meantime
|
||||
if (this.amRegistrationResponse != null) {
|
||||
LOG.info("Other concurrent thread registered successfully, "
|
||||
+ "simply return the same success register response");
|
||||
return this.amRegistrationResponse;
|
||||
}
|
||||
}
|
||||
// This is a real issue, throw back to AM
|
||||
throw e;
|
||||
}
|
||||
|
||||
// the queue this application belongs will be used for getting
|
||||
// AMRMProxy policy from state store.
|
||||
String queue = this.amRegistrationResponse.getQueue();
|
||||
if (queue == null) {
|
||||
LOG.warn("Received null queue for application "
|
||||
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
|
||||
+ " from home subcluster. Will use default queue name "
|
||||
+ YarnConfiguration.DEFAULT_QUEUE_NAME
|
||||
+ " for getting AMRMProxyPolicy");
|
||||
} else {
|
||||
LOG.info("Application "
|
||||
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
|
||||
+ " belongs to queue " + queue);
|
||||
}
|
||||
|
||||
return this.amRegistrationResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the heart beats to the home RM and the secondary sub-cluster RMs that
|
||||
* are being used by the application.
|
||||
*/
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException {
|
||||
|
||||
try {
|
||||
// Split the heart beat request into multiple requests, one for each
|
||||
// sub-cluster RM that is used by this application.
|
||||
Map<SubClusterId, AllocateRequest> requests =
|
||||
splitAllocateRequest(request);
|
||||
|
||||
// Send the request to the home RM and get the response
|
||||
AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
|
||||
requests.get(this.homeSubClusterId), this.homeRM,
|
||||
this.amRegistrationRequest,
|
||||
getApplicationContext().getApplicationAttemptId());
|
||||
|
||||
// If the resource manager sent us a new token, add to the current user
|
||||
if (homeResponse.getAMRMToken() != null) {
|
||||
LOG.debug("Received new AMRMToken");
|
||||
YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
|
||||
this.appOwner, getConf());
|
||||
}
|
||||
|
||||
// Merge the responses from home and secondary sub-cluster RMs
|
||||
homeResponse = mergeAllocateResponses(homeResponse);
|
||||
|
||||
// return the final response to the application master.
|
||||
return homeResponse;
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Exception encountered while processing heart beat", ex);
|
||||
throw new YarnException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the finish application master request to all the resource managers
|
||||
* used by the application.
|
||||
*/
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
FinishApplicationMasterResponse homeResponse =
|
||||
AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
|
||||
this.amRegistrationRequest,
|
||||
getApplicationContext().getApplicationAttemptId());
|
||||
return homeResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextInterceptor(RequestInterceptor next) {
|
||||
throw new YarnRuntimeException(
|
||||
"setNextInterceptor is being called on FederationInterceptor. "
|
||||
+ "It should always be used as the last interceptor in the chain");
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called when the application pipeline is being destroyed. We will
|
||||
* release all the resources that we are holding in this call.
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
super.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns instance of the ApplicationMasterProtocol proxy class that is used
|
||||
* to connect to the Home resource manager.
|
||||
*
|
||||
* @param appContext AMRMProxyApplicationContext
|
||||
* @return the proxy created
|
||||
*/
|
||||
protected ApplicationMasterProtocol createHomeRMProxy(
|
||||
AMRMProxyApplicationContext appContext) {
|
||||
try {
|
||||
return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
|
||||
ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner,
|
||||
appContext.getAMRMToken());
|
||||
} catch (Exception ex) {
|
||||
throw new YarnRuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In federation, the heart beat request needs to be sent to all the sub
|
||||
* clusters from which the AM has requested containers. This method splits the
|
||||
* specified AllocateRequest from the AM and creates a new request for each
|
||||
* sub-cluster RM.
|
||||
*/
|
||||
private Map<SubClusterId, AllocateRequest> splitAllocateRequest(
|
||||
AllocateRequest request) throws YarnException {
|
||||
Map<SubClusterId, AllocateRequest> requestMap =
|
||||
new HashMap<SubClusterId, AllocateRequest>();
|
||||
|
||||
// Create heart beat request for home sub-cluster resource manager
|
||||
findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
|
||||
requestMap);
|
||||
|
||||
if (!isNullOrEmpty(request.getAskList())) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
this.homeSubClusterId, request, requestMap);
|
||||
newRequest.getAskList().addAll(request.getAskList());
|
||||
}
|
||||
|
||||
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
|
||||
request.getResourceBlacklistRequest().getBlacklistAdditions())) {
|
||||
for (String resourceName : request.getResourceBlacklistRequest()
|
||||
.getBlacklistAdditions()) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
this.homeSubClusterId, request, requestMap);
|
||||
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
|
||||
.add(resourceName);
|
||||
}
|
||||
}
|
||||
|
||||
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
|
||||
request.getResourceBlacklistRequest().getBlacklistRemovals())) {
|
||||
for (String resourceName : request.getResourceBlacklistRequest()
|
||||
.getBlacklistRemovals()) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
this.homeSubClusterId, request, requestMap);
|
||||
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
|
||||
.add(resourceName);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(request.getReleaseList())) {
|
||||
for (ContainerId cid : request.getReleaseList()) {
|
||||
if (warnIfNotExists(cid, "release")) {
|
||||
SubClusterId subClusterId =
|
||||
this.containerIdToSubClusterIdMap.get(cid);
|
||||
AllocateRequest newRequest = requestMap.get(subClusterId);
|
||||
newRequest.getReleaseList().add(cid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(request.getUpdateRequests())) {
|
||||
for (UpdateContainerRequest ucr : request.getUpdateRequests()) {
|
||||
if (warnIfNotExists(ucr.getContainerId(), "update")) {
|
||||
SubClusterId subClusterId =
|
||||
this.containerIdToSubClusterIdMap.get(ucr.getContainerId());
|
||||
AllocateRequest newRequest = requestMap.get(subClusterId);
|
||||
newRequest.getUpdateRequests().add(ucr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return requestMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the responses from other sub-clusters that we received
|
||||
* asynchronously with the specified home cluster response and keeps track of
|
||||
* the containers received from each sub-cluster resource managers.
|
||||
*/
|
||||
private AllocateResponse mergeAllocateResponses(
|
||||
AllocateResponse homeResponse) {
|
||||
// Timing issue, we need to remove the completed and then save the new ones.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove containers: "
|
||||
+ homeResponse.getCompletedContainersStatuses());
|
||||
LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers());
|
||||
}
|
||||
removeFinishedContainersFromCache(
|
||||
homeResponse.getCompletedContainersStatuses());
|
||||
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
|
||||
this.homeSubClusterId);
|
||||
|
||||
return homeResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the finished containers from the local cache.
|
||||
*/
|
||||
private void removeFinishedContainersFromCache(
|
||||
List<ContainerStatus> finishedContainers) {
|
||||
for (ContainerStatus container : finishedContainers) {
|
||||
if (containerIdToSubClusterIdMap
|
||||
.containsKey(container.getContainerId())) {
|
||||
containerIdToSubClusterIdMap.remove(container.getContainerId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add allocated containers to cache mapping.
|
||||
*/
|
||||
private void cacheAllocatedContainers(List<Container> containers,
|
||||
SubClusterId subClusterId) {
|
||||
for (Container container : containers) {
|
||||
if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
|
||||
SubClusterId existingSubClusterId =
|
||||
containerIdToSubClusterIdMap.get(container.getId());
|
||||
if (existingSubClusterId.equals(subClusterId)) {
|
||||
// When RM fails over, the new RM master might send out the same
|
||||
// container allocation more than once. Just move on in this case.
|
||||
LOG.warn(
|
||||
"Duplicate containerID: {} found in the allocated containers"
|
||||
+ " from same subcluster: {}, so ignoring.",
|
||||
container.getId(), subClusterId);
|
||||
} else {
|
||||
// The same container allocation from different subclusters,
|
||||
// something is wrong.
|
||||
// TODO: YARN-6667 if some subcluster RM is configured wrong, we
|
||||
// should not fail the entire heartbeat.
|
||||
throw new YarnRuntimeException(
|
||||
"Duplicate containerID found in the allocated containers. This"
|
||||
+ " can happen if the RM epoch is not configured properly."
|
||||
+ " ContainerId: " + container.getId().toString()
|
||||
+ " ApplicationId: "
|
||||
+ getApplicationContext().getApplicationAttemptId()
|
||||
+ " From RM: " + subClusterId
|
||||
+ " . Previous container was from subcluster: "
|
||||
+ existingSubClusterId);
|
||||
}
|
||||
}
|
||||
|
||||
containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if an AllocateRequest exists in the Map for the specified sub
|
||||
* cluster. If not found, create a new one, copy the value of responseId and
|
||||
* progress from the orignialAMRequest, save it in the specified Map and
|
||||
* return the new instance. If found, just return the old instance.
|
||||
*/
|
||||
private static AllocateRequest findOrCreateAllocateRequestForSubCluster(
|
||||
SubClusterId subClusterId, AllocateRequest originalAMRequest,
|
||||
Map<SubClusterId, AllocateRequest> requestMap) {
|
||||
AllocateRequest newRequest = null;
|
||||
if (requestMap.containsKey(subClusterId)) {
|
||||
newRequest = requestMap.get(subClusterId);
|
||||
} else {
|
||||
newRequest = createAllocateRequest();
|
||||
newRequest.setResponseId(originalAMRequest.getResponseId());
|
||||
newRequest.setProgress(originalAMRequest.getProgress());
|
||||
requestMap.put(subClusterId, newRequest);
|
||||
}
|
||||
|
||||
return newRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an empty AllocateRequest instance.
|
||||
*/
|
||||
private static AllocateRequest createAllocateRequest() {
|
||||
AllocateRequest request =
|
||||
AllocateRequest.newInstance(0, 0, null, null, null);
|
||||
request.setAskList(new ArrayList<ResourceRequest>());
|
||||
request.setReleaseList(new ArrayList<ContainerId>());
|
||||
ResourceBlacklistRequest blackList =
|
||||
ResourceBlacklistRequest.newInstance(null, null);
|
||||
blackList.setBlacklistAdditions(new ArrayList<String>());
|
||||
blackList.setBlacklistRemovals(new ArrayList<String>());
|
||||
request.setResourceBlacklistRequest(blackList);
|
||||
request.setUpdateRequests(new ArrayList<UpdateContainerRequest>());
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if the specified containerId exists in the cache and log an
|
||||
* error if not found.
|
||||
*
|
||||
* @param containerId the container id
|
||||
* @param actionName the name of the action
|
||||
* @return true if the container exists in the map, false otherwise
|
||||
*/
|
||||
private boolean warnIfNotExists(ContainerId containerId, String actionName) {
|
||||
if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
|
||||
LOG.error("AM is trying to {} a container {} that does not exist. ",
|
||||
actionName, containerId.toString());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to check if the specified Collection is null or empty
|
||||
*
|
||||
* @param c the collection object
|
||||
* @param <T> element type of the collection
|
||||
* @return whether is it is null or empty
|
||||
*/
|
||||
public static <T> boolean isNullOrEmpty(Collection<T> c) {
|
||||
return (c == null || c.size() == 0);
|
||||
}
|
||||
}
|
@ -42,6 +42,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
|
@ -0,0 +1,167 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Extends the TestAMRMProxyService and overrides methods in order to use the
|
||||
* AMRMProxyService's pipeline test cases for testing the FederationInterceptor
|
||||
* class. The tests for AMRMProxyService has been written cleverly so that it
|
||||
* can be reused to validate different request intercepter chains.
|
||||
*/
|
||||
public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestFederationInterceptor.class);
|
||||
|
||||
public static final String HOME_SC_ID = "SC-home";
|
||||
|
||||
private TestableFederationInterceptor interceptor;
|
||||
|
||||
private int testAppId;
|
||||
private ApplicationAttemptId attemptId;
|
||||
|
||||
@Override
|
||||
public void setUp() throws IOException {
|
||||
super.setUp();
|
||||
interceptor = new TestableFederationInterceptor();
|
||||
|
||||
testAppId = 1;
|
||||
attemptId = getApplicationAttemptId(testAppId);
|
||||
interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
|
||||
attemptId, "test-user", null, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() {
|
||||
interceptor.shutdown();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected YarnConfiguration createConfiguration() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||
String mockPassThroughInterceptorClass =
|
||||
PassThroughRequestInterceptor.class.getName();
|
||||
|
||||
// Create a request intercepter pipeline for testing. The last one in the
|
||||
// chain is the federation intercepter that calls the mock resource manager.
|
||||
// The others in the chain will simply forward it to the next one in the
|
||||
// chain
|
||||
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||
+ "," + TestableFederationInterceptor.class.getName());
|
||||
|
||||
conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestInterceptorChainCreation() throws Exception {
|
||||
RequestInterceptor root =
|
||||
super.getAMRMProxyService().createRequestInterceptorChain();
|
||||
int index = 0;
|
||||
while (root != null) {
|
||||
switch (index) {
|
||||
case 0:
|
||||
case 1:
|
||||
Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
|
||||
root.getClass().getName());
|
||||
break;
|
||||
case 2:
|
||||
Assert.assertEquals(TestableFederationInterceptor.class.getName(),
|
||||
root.getClass().getName());
|
||||
break;
|
||||
default:
|
||||
Assert.fail();
|
||||
}
|
||||
root = root.getNextInterceptor();
|
||||
index++;
|
||||
}
|
||||
Assert.assertEquals("The number of interceptors in chain does not match",
|
||||
Integer.toString(3), Integer.toString(index));
|
||||
}
|
||||
|
||||
/**
|
||||
* Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
|
||||
* so that when AM registers more than once, it returns the same register
|
||||
* success response instead of throwing
|
||||
* {@link InvalidApplicationMasterRequestException}
|
||||
*
|
||||
* We did this because FederationInterceptor can receive concurrent register
|
||||
* requests from AM because of timeout between AM and AMRMProxy. This can
|
||||
* possible since the timeout between FederationInterceptor and RM longer
|
||||
* because of performFailover + timeout.
|
||||
*/
|
||||
@Test
|
||||
public void testTwoIdenticalRegisterRequest() throws Exception {
|
||||
// Register the application twice
|
||||
RegisterApplicationMasterRequest registerReq =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
registerReq.setHost(Integer.toString(testAppId));
|
||||
registerReq.setRpcPort(testAppId);
|
||||
registerReq.setTrackingUrl("");
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
interceptor.registerApplicationMaster(registerReq);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoDifferentRegisterRequest() throws Exception {
|
||||
// Register the application first time
|
||||
RegisterApplicationMasterRequest registerReq =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
registerReq.setHost(Integer.toString(testAppId));
|
||||
registerReq.setRpcPort(testAppId);
|
||||
registerReq.setTrackingUrl("");
|
||||
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
interceptor.registerApplicationMaster(registerReq);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
|
||||
// Register the application second time with a different request obj
|
||||
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
registerReq.setHost(Integer.toString(testAppId));
|
||||
registerReq.setRpcPort(testAppId);
|
||||
registerReq.setTrackingUrl("different");
|
||||
try {
|
||||
registerResponse = interceptor.registerApplicationMaster(registerReq);
|
||||
Assert.fail("Should throw if a different request obj is used");
|
||||
} catch (YarnException e) {
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,133 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
|
||||
|
||||
/**
|
||||
* Extends the FederationInterceptor and overrides methods to provide a testable
|
||||
* implementation of FederationInterceptor.
|
||||
*/
|
||||
public class TestableFederationInterceptor extends FederationInterceptor {
|
||||
private ConcurrentHashMap<String, MockResourceManagerFacade>
|
||||
secondaryResourceManagers = new ConcurrentHashMap<>();
|
||||
private AtomicInteger runningIndex = new AtomicInteger(0);
|
||||
private MockResourceManagerFacade mockRm;
|
||||
|
||||
@Override
|
||||
protected ApplicationMasterProtocol createHomeRMProxy(
|
||||
AMRMProxyApplicationContext appContext) {
|
||||
synchronized (this) {
|
||||
if (mockRm == null) {
|
||||
mockRm = new MockResourceManagerFacade(
|
||||
new YarnConfiguration(super.getConf()), 0);
|
||||
}
|
||||
}
|
||||
return mockRm;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T createSecondaryRMProxy(Class<T> proxyClass,
|
||||
Configuration conf, String subClusterId) throws IOException {
|
||||
// We create one instance of the mock resource manager per sub cluster. Keep
|
||||
// track of the instances of the RMs in the map keyed by the sub cluster id
|
||||
synchronized (this.secondaryResourceManagers) {
|
||||
if (this.secondaryResourceManagers.contains(subClusterId)) {
|
||||
return (T) this.secondaryResourceManagers.get(subClusterId);
|
||||
} else {
|
||||
// The running index here is used to simulate different RM_EPOCH to
|
||||
// generate unique container identifiers in a federation environment
|
||||
MockResourceManagerFacade rm = new MockResourceManagerFacade(
|
||||
new Configuration(conf), runningIndex.addAndGet(10000));
|
||||
this.secondaryResourceManagers.put(subClusterId, rm);
|
||||
return (T) rm;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void setShouldReRegisterNext() {
|
||||
if (mockRm != null) {
|
||||
mockRm.setShouldReRegisterNext();
|
||||
}
|
||||
for (MockResourceManagerFacade subCluster : secondaryResourceManagers
|
||||
.values()) {
|
||||
subCluster.setShouldReRegisterNext();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extends the UnmanagedAMPoolManager and overrides methods to provide a
|
||||
* testable implementation of UnmanagedAMPoolManager.
|
||||
*/
|
||||
protected class TestableUnmanagedAMPoolManager
|
||||
extends UnmanagedAMPoolManager {
|
||||
public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
|
||||
super(threadpool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UnmanagedApplicationManager createUAM(Configuration conf,
|
||||
ApplicationId appId, String queueName, String submitter,
|
||||
String appNameSuffix) {
|
||||
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
|
||||
submitter, appNameSuffix);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extends the UnmanagedApplicationManager and overrides methods to provide a
|
||||
* testable implementation.
|
||||
*/
|
||||
protected class TestableUnmanagedApplicationManager
|
||||
extends UnmanagedApplicationManager {
|
||||
|
||||
public TestableUnmanagedApplicationManager(Configuration conf,
|
||||
ApplicationId appId, String queueName, String submitter,
|
||||
String appNameSuffix) {
|
||||
super(conf, appId, queueName, submitter, appNameSuffix);
|
||||
}
|
||||
|
||||
/**
|
||||
* We override this method here to return a mock RM instances. The base
|
||||
* class returns the proxy to the real RM which will not work in case of
|
||||
* stand alone test cases.
|
||||
*/
|
||||
@Override
|
||||
protected <T> T createRMProxy(Class<T> protocol, Configuration config,
|
||||
UserGroupInformation user, Token<AMRMTokenIdentifier> token)
|
||||
throws IOException {
|
||||
return createSecondaryRMProxy(protocol, config,
|
||||
YarnConfiguration.getClusterId(config));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user