From 55ae1439233e8585d624b2872e1e4753ef63eebb Mon Sep 17 00:00:00 2001 From: Jian He Date: Sun, 27 Mar 2016 20:22:12 -0700 Subject: [PATCH] YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola --- .../amrmproxy/AMRMProxyService.java | 14 +++- .../amrmproxy/DefaultRequestInterceptor.java | 7 ++ .../ContainerManagerImpl.java | 49 +++++++----- .../hadoop/yarn/server/MiniYARNCluster.java | 75 ++++++++++++++++++- 4 files changed, 122 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index bd6538c99f..038c697b98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; @@ -512,6 +513,16 @@ private Token getFirstAMRMToken( return null; } + @Private + public InetSocketAddress getBindAddress() { + return this.listenerEndpoint; + } + + @Private + public AMRMProxyTokenSecretManager getSecretManager() { + return this.secretManager; + } + /** * Private class for handling application stop events. * @@ -546,7 +557,8 @@ public void handle(ApplicationEvent event) { * ApplicationAttemptId instances. * */ - private static class RequestInterceptorChainWrapper { + @Private + public static class RequestInterceptorChainWrapper { private RequestInterceptor rootInterceptor; private ApplicationAttemptId applicationAttemptId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 2c7939b009..4457dd8f44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -39,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Extends the AbstractRequestInterceptor class and provides an implementation * that simply forwards the AM requests to the cluster resource manager. @@ -135,4 +137,9 @@ private void updateAMRMToken(Token token) throws IOException { user.addToken(amrmToken); amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); } + + @VisibleForTesting + public void setRMClient(ApplicationMasterProtocol rmClient) { + this.rmClient = rmClient; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 94d5c1e6ed..8d09aa75bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements private final ReadLock readLock; private final WriteLock writeLock; private AMRMProxyService amrmProxyService; - private boolean amrmProxyEnabled = false; + protected boolean amrmProxyEnabled = false; private long waitForContainersOnShutdownMillis; @@ -247,19 +247,7 @@ public void serviceInit(Configuration conf) throws Exception { addService(sharedCacheUploader); dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); - amrmProxyEnabled = - conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, - YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); - - if (amrmProxyEnabled) { - LOG.info("AMRMProxyService is enabled. " - + "All the AM->RM requests will be intercepted by the proxy"); - this.amrmProxyService = - new AMRMProxyService(this.context, this.dispatcher); - addService(this.amrmProxyService); - } else { - LOG.info("AMRMProxyService is disabled"); - } + createAMRMProxyService(conf); waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, @@ -272,8 +260,20 @@ public void serviceInit(Configuration conf) throws Exception { recover(); } - public boolean isARMRMProxyEnabled() { - return amrmProxyEnabled; + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (amrmProxyEnabled) { + LOG.info("AMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + this.setAMRMProxyService( + new AMRMProxyService(this.context, this.dispatcher)); + addService(this.getAMRMProxyService()); + } else { + LOG.info("AMRMProxyService is disabled"); + } } @SuppressWarnings("unchecked") @@ -810,9 +810,9 @@ public StartContainersResponse startContainers( // Initialize the AMRMProxy service instance only if the container is of // type AM and if the AMRMProxy service is enabled - if (isARMRMProxyEnabled() && containerTokenIdentifier - .getContainerType().equals(ContainerType.APPLICATION_MASTER)) { - this.amrmProxyService.processApplicationStartRequest(request); + if (amrmProxyEnabled && containerTokenIdentifier.getContainerType() + .equals(ContainerType.APPLICATION_MASTER)) { + this.getAMRMProxyService().processApplicationStartRequest(request); } startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, @@ -1413,4 +1413,15 @@ public Context getContext() { public Map getAuxServiceMetaData() { return this.auxiliaryServices.getMetaData(); } + + @Private + public AMRMProxyService getAMRMProxyService() { + return this.amrmProxyService; + } + + @Private + protected void setAMRMProxyService(AMRMProxyService amrmProxyService) { + this.amrmProxyService = amrmProxyService; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 74b7732304..c933736b32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -35,21 +35,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -698,6 +707,15 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager( protected void stopRMProxy() { } }; } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + return new CustomContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } } /** @@ -799,4 +817,55 @@ protected void doSecureLogin() throws IOException { public int getNumOfResourceManager() { return this.resourceManagers.length; } + + private class CustomContainerManagerImpl extends ContainerManagerImpl { + + public CustomContainerManagerImpl(Context context, ContainerExecutor exec, + DeletionService del, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { + super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); + } + + @Override + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (this.amrmProxyEnabled) { + LOG.info("CustomAMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + AMRMProxyService amrmProxyService = + useRpc ? new AMRMProxyService(getContext(), dispatcher) + : new ShortCircuitedAMRMProxy(getContext(), dispatcher); + this.setAMRMProxyService(amrmProxyService); + addService(this.getAMRMProxyService()); + } else { + LOG.info("CustomAMRMProxyService is disabled"); + } + } + } + + private class ShortCircuitedAMRMProxy extends AMRMProxyService { + + public ShortCircuitedAMRMProxy(Context context, + AsyncDispatcher dispatcher) { + super(context, dispatcher); + } + + @Override + protected void initializePipeline(ApplicationAttemptId applicationAttemptId, + String user, Token amrmToken, + Token localToken) { + super.initializePipeline(applicationAttemptId, user, amrmToken, + localToken); + RequestInterceptor rt = getPipelines() + .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); + if (rt instanceof DefaultRequestInterceptor) { + ((DefaultRequestInterceptor) rt) + .setRMClient(getResourceManager().getApplicationMasterService()); + } + } + + } }