YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola

This commit is contained in:
Jian He 2016-03-27 20:22:12 -07:00
parent 49ff54c860
commit 55ae143923
4 changed files with 122 additions and 23 deletions

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
@ -512,6 +513,16 @@ public class AMRMProxyService extends AbstractService implements
return null;
}
@Private
public InetSocketAddress getBindAddress() {
return this.listenerEndpoint;
}
@Private
public AMRMProxyTokenSecretManager getSecretManager() {
return this.secretManager;
}
/**
* Private class for handling application stop events.
*
@ -546,7 +557,8 @@ public class AMRMProxyService extends AbstractService implements
* ApplicationAttemptId instances.
*
*/
private static class RequestInterceptorChainWrapper {
@Private
public static class RequestInterceptorChainWrapper {
private RequestInterceptor rootInterceptor;
private ApplicationAttemptId applicationAttemptId;

View File

@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Extends the AbstractRequestInterceptor class and provides an implementation
* that simply forwards the AM requests to the cluster resource manager.
@ -135,4 +137,9 @@ public final class DefaultRequestInterceptor extends
user.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
}
@VisibleForTesting
public void setRMClient(ApplicationMasterProtocol rmClient) {
this.rmClient = rmClient;
}
}

View File

@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements
private final ReadLock readLock;
private final WriteLock writeLock;
private AMRMProxyService amrmProxyService;
private boolean amrmProxyEnabled = false;
protected boolean amrmProxyEnabled = false;
private long waitForContainersOnShutdownMillis;
@ -247,19 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements
addService(sharedCacheUploader);
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
amrmProxyEnabled =
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
if (amrmProxyEnabled) {
LOG.info("AMRMProxyService is enabled. "
+ "All the AM->RM requests will be intercepted by the proxy");
this.amrmProxyService =
new AMRMProxyService(this.context, this.dispatcher);
addService(this.amrmProxyService);
} else {
LOG.info("AMRMProxyService is disabled");
}
createAMRMProxyService(conf);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
@ -272,8 +260,20 @@ public class ContainerManagerImpl extends CompositeService implements
recover();
}
public boolean isARMRMProxyEnabled() {
return amrmProxyEnabled;
protected void createAMRMProxyService(Configuration conf) {
this.amrmProxyEnabled =
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
if (amrmProxyEnabled) {
LOG.info("AMRMProxyService is enabled. "
+ "All the AM->RM requests will be intercepted by the proxy");
this.setAMRMProxyService(
new AMRMProxyService(this.context, this.dispatcher));
addService(this.getAMRMProxyService());
} else {
LOG.info("AMRMProxyService is disabled");
}
}
@SuppressWarnings("unchecked")
@ -810,9 +810,9 @@ public class ContainerManagerImpl extends CompositeService implements
// Initialize the AMRMProxy service instance only if the container is of
// type AM and if the AMRMProxy service is enabled
if (isARMRMProxyEnabled() && containerTokenIdentifier
.getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
this.amrmProxyService.processApplicationStartRequest(request);
if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
.equals(ContainerType.APPLICATION_MASTER)) {
this.getAMRMProxyService().processApplicationStartRequest(request);
}
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
@ -1413,4 +1413,15 @@ public class ContainerManagerImpl extends CompositeService implements
public Map<String, ByteBuffer> getAuxServiceMetaData() {
return this.auxiliaryServices.getMetaData();
}
@Private
public AMRMProxyService getAMRMProxyService() {
return this.amrmProxyService;
}
@Private
protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
this.amrmProxyService = amrmProxyService;
}
}

View File

@ -35,21 +35,23 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@ -698,6 +707,15 @@ public class MiniYARNCluster extends CompositeService {
protected void stopRMProxy() { }
};
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new CustomContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
}
}
/**
@ -799,4 +817,55 @@ public class MiniYARNCluster extends CompositeService {
public int getNumOfResourceManager() {
return this.resourceManagers.length;
}
private class CustomContainerManagerImpl extends ContainerManagerImpl {
public CustomContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService del, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
}
@Override
protected void createAMRMProxyService(Configuration conf) {
this.amrmProxyEnabled =
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
if (this.amrmProxyEnabled) {
LOG.info("CustomAMRMProxyService is enabled. "
+ "All the AM->RM requests will be intercepted by the proxy");
AMRMProxyService amrmProxyService =
useRpc ? new AMRMProxyService(getContext(), dispatcher)
: new ShortCircuitedAMRMProxy(getContext(), dispatcher);
this.setAMRMProxyService(amrmProxyService);
addService(this.getAMRMProxyService());
} else {
LOG.info("CustomAMRMProxyService is disabled");
}
}
}
private class ShortCircuitedAMRMProxy extends AMRMProxyService {
public ShortCircuitedAMRMProxy(Context context,
AsyncDispatcher dispatcher) {
super(context, dispatcher);
}
@Override
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
super.initializePipeline(applicationAttemptId, user, amrmToken,
localToken);
RequestInterceptor rt = getPipelines()
.get(applicationAttemptId.getApplicationId()).getRootInterceptor();
if (rt instanceof DefaultRequestInterceptor) {
((DefaultRequestInterceptor) rt)
.setRMClient(getResourceManager().getApplicationMasterService());
}
}
}
}