From 7ef54faad4bee4346da082a3f8cc5d6ea405d74a Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 18 Jun 2013 04:02:47 +0000 Subject: [PATCH] YARN-834. Fixed annotations for yarn-client module, reorganized packages and clearly differentiated *Async apis. Contributed by Arun C Murthy and Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1494017 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/mapred/ResourceMgrDelegate.java | 160 +++++++++++- .../mapred/TestResourceMgrDelegate.java | 11 +- .../apache/hadoop/mapred/TestYARNRunner.java | 6 +- .../TestYarnClientProtocolProvider.java | 6 +- hadoop-yarn-project/CHANGES.txt | 4 + .../distributedshell/ApplicationMaster.java | 17 +- .../applications/distributedshell/Client.java | 2 +- .../UnmanagedAMLauncher.java | 3 +- .../yarn/client/{ => api}/AMRMClient.java | 7 +- .../yarn/client/{ => api}/NMClient.java | 5 +- .../yarn/client/{ => api}/YarnClient.java | 5 +- .../client/api/async/AMRMClientAsync.java | 245 ++++++++++++++++++ .../yarn/client/api/async/NMClientAsync.java | 235 +++++++++++++++++ .../async/impl/AMRMClientAsyncImpl.java} | 105 +------- .../async/impl/NMClientAsyncImpl.java} | 190 ++------------ .../client/api/async/impl/package-info.java | 21 ++ .../yarn/client/api/async/package-info.java | 21 ++ .../client/{ => api/impl}/AMRMClientImpl.java | 58 +++-- .../client/{ => api/impl}/NMClientImpl.java | 21 +- .../client/{ => api/impl}/YarnClientImpl.java | 19 +- .../yarn/client/api/impl/package-info.java | 21 ++ .../hadoop/yarn/client/api/package-info.java | 21 ++ .../yarn/client/cli/ApplicationCLI.java | 4 + .../hadoop/yarn/client/cli/NodeCLI.java | 4 + .../hadoop/yarn/client/cli/RMAdminCLI.java | 4 + .../hadoop/yarn/client/cli/YarnCLI.java | 7 +- .../hadoop/yarn/client/cli/package-info.java | 21 ++ .../async/impl}/TestAMRMClientAsync.java | 13 +- .../async/impl}/TestNMClientAsync.java | 51 ++-- .../client/{ => api/impl}/TestAMRMClient.java | 9 +- .../impl}/TestAMRMClientContainerRequest.java | 9 +- .../client/{ => api/impl}/TestNMClient.java | 16 +- .../client/{ => api/impl}/TestYarnClient.java | 4 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 2 +- 34 files changed, 945 insertions(+), 382 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{ => api}/AMRMClient.java (98%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{ => api}/NMClient.java (97%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{ => api}/YarnClient.java (98%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{AMRMClientAsync.java => api/async/impl/AMRMClientAsyncImpl.java} (76%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{NMClientAsync.java => api/async/impl/NMClientAsyncImpl.java} (76%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{ => api/impl}/AMRMClientImpl.java (92%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{ => api/impl}/NMClientImpl.java (95%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/{ => api/impl}/YarnClientImpl.java (96%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => api/async/impl}/TestAMRMClientAsync.java (95%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => api/async/impl}/TestNMClientAsync.java (91%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => api/impl}/TestAMRMClient.java (98%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => api/impl}/TestAMRMClientContainerRequest.java (91%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => api/impl}/TestNMClient.java (95%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => api/impl}/TestYarnClient.java (97%) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 349a019459..13b93014a3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -20,9 +20,12 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -40,34 +43,85 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; -import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ProtoUtils; -public class ResourceMgrDelegate extends YarnClientImpl { +import com.google.common.annotations.VisibleForTesting; + +public class ResourceMgrDelegate extends YarnClient { private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); private YarnConfiguration conf; private GetNewApplicationResponse application; private ApplicationId applicationId; + @Private + @VisibleForTesting + protected YarnClient client; + private InetSocketAddress rmAddress; /** - * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}. + * Delegate responsible for communicating with the Resource Manager's + * {@link ApplicationClientProtocol}. * @param conf the configuration object. */ public ResourceMgrDelegate(YarnConfiguration conf) { - super(); + this(conf, null); + } + + /** + * Delegate responsible for communicating with the Resource Manager's + * {@link ApplicationClientProtocol}. + * @param conf the configuration object. + * @param rmAddress the address of the Resource Manager + */ + public ResourceMgrDelegate(YarnConfiguration conf, + InetSocketAddress rmAddress) { + super(ResourceMgrDelegate.class.getName()); this.conf = conf; + this.rmAddress = rmAddress; + if (rmAddress == null) { + client = YarnClient.createYarnClient(); + } else { + client = YarnClient.createYarnClient(rmAddress); + } init(conf); start(); } + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (rmAddress == null) { + this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } + client.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + client.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + client.stop(); + super.serviceStop(); + } + public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { try { - return TypeConverter.fromYarnNodes(super.getNodeReports()); + return TypeConverter.fromYarnNodes(client.getNodeReports()); } catch (YarnException e) { throw new IOException(e); } @@ -75,7 +129,7 @@ public TaskTrackerInfo[] getActiveTrackers() throws IOException, public JobStatus[] getAllJobs() throws IOException, InterruptedException { try { - return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf); + return TypeConverter.fromYarnApps(client.getApplicationList(), this.conf); } catch (YarnException e) { throw new IOException(e); } @@ -91,7 +145,7 @@ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { try { - YarnClusterMetrics metrics = super.getYarnClusterMetrics(); + YarnClusterMetrics metrics = client.getYarnClusterMetrics(); ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, metrics.getNumNodeManagers() * 10, @@ -112,7 +166,7 @@ public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { try { return ProtoUtils.convertFromProtoFormat( - super.getRMDelegationToken(renewer), rmAddress); + client.getRMDelegationToken(renewer), rmAddress); } catch (YarnException e) { throw new IOException(e); } @@ -124,7 +178,7 @@ public String getFilesystemName() throws IOException, InterruptedException { public JobID getNewJobID() throws IOException, InterruptedException { try { - this.application = super.getNewApplication(); + this.application = client.getNewApplication(); this.applicationId = this.application.getApplicationId(); return TypeConverter.fromYarn(applicationId); } catch (YarnException e) { @@ -136,7 +190,7 @@ public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { try { org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = - super.getQueueInfo(queueName); + client.getQueueInfo(queueName); return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo, conf); } catch (YarnException e) { @@ -147,7 +201,7 @@ public QueueInfo getQueue(String queueName) throws IOException, public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { try { - return TypeConverter.fromYarnQueueUserAclsInfo(super + return TypeConverter.fromYarnQueueUserAclsInfo(client .getQueueAclsInfo()); } catch (YarnException e) { throw new IOException(e); @@ -156,7 +210,7 @@ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, public QueueInfo[] getQueues() throws IOException, InterruptedException { try { - return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf); + return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf); } catch (YarnException e) { throw new IOException(e); } @@ -164,7 +218,7 @@ public QueueInfo[] getQueues() throws IOException, InterruptedException { public QueueInfo[] getRootQueues() throws IOException, InterruptedException { try { - return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), + return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(), this.conf); } catch (YarnException e) { throw new IOException(e); @@ -174,7 +228,7 @@ public QueueInfo[] getRootQueues() throws IOException, InterruptedException { public QueueInfo[] getChildQueues(String parent) throws IOException, InterruptedException { try { - return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent), + return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent), this.conf); } catch (YarnException e) { throw new IOException(e); @@ -216,4 +270,82 @@ public long getProtocolVersion(String arg0, long arg1) throws IOException { public ApplicationId getApplicationId() { return applicationId; } + + @Override + public GetNewApplicationResponse getNewApplication() throws YarnException, + IOException { + return client.getNewApplication(); + } + + @Override + public ApplicationId + submitApplication(ApplicationSubmissionContext appContext) + throws YarnException, IOException { + return client.submitApplication(appContext); + } + + @Override + public void killApplication(ApplicationId applicationId) + throws YarnException, IOException { + client.killApplication(applicationId); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + return client.getApplicationReport(appId); + } + + @Override + public List getApplicationList() throws YarnException, + IOException { + return client.getApplicationList(); + } + + @Override + public YarnClusterMetrics getYarnClusterMetrics() throws YarnException, + IOException { + return client.getYarnClusterMetrics(); + } + + @Override + public List getNodeReports() throws YarnException, IOException { + return client.getNodeReports(); + } + + @Override + public org.apache.hadoop.yarn.api.records.Token getRMDelegationToken( + Text renewer) throws YarnException, IOException { + return client.getRMDelegationToken(renewer); + } + + @Override + public org.apache.hadoop.yarn.api.records.QueueInfo getQueueInfo( + String queueName) throws YarnException, IOException { + return client.getQueueInfo(queueName); + } + + @Override + public List getAllQueues() + throws YarnException, IOException { + return client.getAllQueues(); + } + + @Override + public List getRootQueueInfos() + throws YarnException, IOException { + return client.getRootQueueInfos(); + } + + @Override + public List getChildQueueInfos( + String parent) throws YarnException, IOException { + return client.getChildQueueInfos(parent); + } + + @Override + public List getQueueAclsInfo() throws YarnException, + IOException { + return client.getQueueAclsInfo(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java index 816804d671..2339fb58d2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; @@ -67,8 +68,9 @@ public void testGetRootQueues() throws IOException, InterruptedException { ResourceMgrDelegate delegate = new ResourceMgrDelegate( new YarnConfiguration()) { @Override - protected void serviceStart() { - this.rmClient = applicationsManager; + protected void serviceStart() throws Exception { + Assert.assertTrue(this.client instanceof YarnClientImpl); + ((YarnClientImpl) this.client).setRMClient(applicationsManager); } }; delegate.getRootQueues(); @@ -110,8 +112,9 @@ public void tesAllJobs() throws Exception { ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate( new YarnConfiguration()) { @Override - protected void serviceStart() { - this.rmClient = applicationsManager; + protected void serviceStart() throws Exception { + Assert.assertTrue(this.client instanceof YarnClientImpl); + ((YarnClientImpl) this.client).setRMClient(applicationsManager); } }; JobStatus[] allJobs = resourceMgrDelegate.getAllJobs(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 0ccfcc306a..0046c9b330 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -200,8 +201,9 @@ public void testResourceMgrDelegate() throws Exception { final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class); ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) { @Override - protected void serviceStart() { - this.rmClient = clientRMProtocol; + protected void serviceStart() throws Exception { + assertTrue(this.client instanceof YarnClientImpl); + ((YarnClientImpl) this.client).setRMClient(clientRMProtocol); } }; /* make sure kill calls finish application master */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java index 289d0f21e5..aeb20cd449 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -110,8 +111,9 @@ public void testClusterGetDelegationToken() throws Exception { ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate( new YarnConfiguration(conf)) { @Override - protected void serviceStart() { - this.rmClient = cRMProtocol; + protected void serviceStart() throws Exception { + assertTrue(this.client instanceof YarnClientImpl); + ((YarnClientImpl) this.client).setRMClient(cRMProtocol); } }; yrunner.setResourceMgrDelegate(rmgrDelegate); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index af11848864..991a30fec5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -186,6 +186,10 @@ Release 2.1.0-beta - UNRELEASED YARN-610. ClientToken is no longer set in the environment of the Containers. (Omkar Vinit Joshi via vinodkv) + YARN-834. Fixed annotations for yarn-client module, reorganized packages and + clearly differentiated *Async apis. (Arun C Murthy and Zhijie Shen via + vinodkv) + NEW FEATURES YARN-482. FS: Extend SchedulingMode to intermediate queues. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 64b4d6f21c..6b7768bdc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -67,9 +67,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.AMRMClientAsync; -import org.apache.hadoop.yarn.client.NMClientAsync; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -436,17 +436,18 @@ private void printUsage(Options opts) { * @throws YarnException * @throws IOException */ - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "unchecked" }) public boolean run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); - resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener); + resourceManager = + AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener); resourceManager.init(conf); resourceManager.start(); containerListener = new NMCallbackHandler(); - nmClientAsync = new NMClientAsync(containerListener); + nmClientAsync = NMClientAsync.createNMClientAsync(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); @@ -682,7 +683,7 @@ public void onContainerStarted(ContainerId containerId, } Container container = containers.get(containerId); if (container != null) { - nmClientAsync.getContainerStatus(containerId, container.getNodeId(), + nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(), container.getContainerToken()); } } @@ -802,7 +803,7 @@ public void run() { ctx.setCommands(commands); containerListener.addContainer(container.getId(), container); - nmClientAsync.startContainer(container, ctx); + nmClientAsync.startContainerAsync(container, ctx); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 6046612ed9..9e14ca4404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; -import org.apache.hadoop.yarn.client.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java index 275df8df14..b209d95a73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java @@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.YarnClient; -import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java similarity index 98% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 373cbb35d2..e497005470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api; import java.io.IOException; import java.util.Collection; @@ -36,12 +36,13 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.collect.ImmutableList; @InterfaceAudience.Public -@InterfaceStability.Unstable +@InterfaceStability.Stable public abstract class AMRMClient extends AbstractService { @@ -53,7 +54,7 @@ public abstract class AMRMClient extends * AMRMClient.createAMRMClientContainerRequest(appAttemptId) * } * @param appAttemptId the appAttemptId associated with the AMRMClient - * @return the newly created AMRMClient instance. + * @return the newly create AMRMClient instance. */ @Public public static AMRMClient createAMRMClient( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 9dd3ebda6b..e628745e0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api; import java.io.IOException; import java.nio.ByteBuffer; @@ -33,10 +33,11 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @InterfaceAudience.Public -@InterfaceStability.Unstable +@InterfaceStability.Stable public abstract class NMClient extends AbstractService { /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java similarity index 98% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 57ef8054fc..751c4ba809 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api; import java.io.IOException; import java.net.InetSocketAddress; @@ -37,10 +37,11 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceStability.Stable public abstract class YarnClient extends AbstractService { /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java new file mode 100644 index 0000000000..d24750efbb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -0,0 +1,245 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client.api.async; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.annotations.VisibleForTesting; + +/** + * AMRMClientAsync handles communication with the ResourceManager + * and provides asynchronous updates on events such as container allocations and + * completions. It contains a thread that sends periodic heartbeats to the + * ResourceManager. + * + * It should be used by implementing a CallbackHandler: + *
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ *   public void onContainersAllocated(List containers) {
+ *     [run tasks on the containers]
+ *   }
+ *   
+ *   public void onContainersCompleted(List statuses) {
+ *     [update progress, check whether app is done]
+ *   }
+ *   
+ *   public void onNodesUpdated(List updated) {}
+ *   
+ *   public void onReboot() {}
+ * }
+ * }
+ * 
+ * + * The client's lifecycle should be managed similarly to the following: + * + *
+ * {@code
+ * AMRMClientAsync asyncClient = 
+ *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ *       appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * 
+ */ +@Public +@Stable +public abstract class AMRMClientAsync +extends AbstractService { + + protected final AMRMClient client; + protected final CallbackHandler handler; + protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); + + public static AMRMClientAsync + createAMRMClientAsync( + ApplicationAttemptId id, + int intervalMs, + CallbackHandler callbackHandler) { + return new AMRMClientAsyncImpl(id, intervalMs, callbackHandler); + } + + public static AMRMClientAsync + createAMRMClientAsync( + AMRMClient client, + int intervalMs, + CallbackHandler callbackHandler) { + return new AMRMClientAsyncImpl(client, intervalMs, callbackHandler); + } + + protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs, + CallbackHandler callbackHandler) { + this(new AMRMClientImpl(id), intervalMs, callbackHandler); + } + + @Private + @VisibleForTesting + protected AMRMClientAsync(AMRMClient client, int intervalMs, + CallbackHandler callbackHandler) { + super(AMRMClientAsync.class.getName()); + this.client = client; + this.heartbeatIntervalMs.set(intervalMs); + this.handler = callbackHandler; + } + + public void setHeartbeatInterval(int interval) { + heartbeatIntervalMs.set(interval); + } + + public abstract List> getMatchingRequests( + Priority priority, + String resourceName, + Resource capability); + + /** + * Registers this application master with the resource manager. On successful + * registration, starts the heartbeating thread. + * @throws YarnException + * @throws IOException + */ + public abstract RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl) + throws YarnException, IOException; + + /** + * Unregister the application master. This must be called in the end. + * @param appStatus Success/Failure status of the master + * @param appMessage Diagnostics message on failure + * @param appTrackingUrl New URL to get master info + * @throws YarnException + * @throws IOException + */ + public abstract void unregisterApplicationMaster( + FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) + throws YarnException, IOException; + + /** + * Request containers for resources before calling allocate + * @param req Resource request + */ + public abstract void addContainerRequest(T req); + + /** + * Remove previous container request. The previous container request may have + * already been sent to the ResourceManager. So even after the remove request + * the app must be prepared to receive an allocation for the previous request + * even after the remove request + * @param req Resource request + */ + public abstract void removeContainerRequest(T req); + + /** + * Release containers assigned by the Resource Manager. If the app cannot use + * the container or wants to give up the container then it can release them. + * The app needs to make new requests for the released resource capability if + * it still needs it. eg. it released non-local resources + * @param containerId + */ + public abstract void releaseAssignedContainer(ContainerId containerId); + + /** + * Get the currently available resources in the cluster. + * A valid value is available after a call to allocate has been made + * @return Currently available resources + */ + public abstract Resource getClusterAvailableResources(); + + /** + * Get the current number of nodes in the cluster. + * A valid values is available after a call to allocate has been made + * @return Current number of nodes in the cluster + */ + public abstract int getClusterNodeCount(); + + /** + * It returns the NMToken received on allocate call. It will not communicate + * with RM to get NMTokens. On allocate call whenever we receive new token + * along with new container AMRMClientAsync will cache this NMToken per node + * manager. This map returned should be shared with any application which is + * communicating with NodeManager (ex. NMClient / NMClientAsync) using + * NMTokens. If a new NMToken is received for the same node manager + * then it will be replaced. + */ + public abstract ConcurrentMap getNMTokens(); + + public interface CallbackHandler { + + /** + * Called when the ResourceManager responds to a heartbeat with completed + * containers. If the response contains both completed containers and + * allocated containers, this will be called before containersAllocated. + */ + public void onContainersCompleted(List statuses); + + /** + * Called when the ResourceManager responds to a heartbeat with allocated + * containers. If the response containers both completed containers and + * allocated containers, this will be called after containersCompleted. + */ + public void onContainersAllocated(List containers); + + /** + * Called when the ResourceManager wants the ApplicationMaster to shutdown + * for being out of sync etc. The ApplicationMaster should not unregister + * with the RM unless the ApplicationMaster wants to be the last attempt. + */ + public void onShutdownRequest(); + + /** + * Called when nodes tracked by the ResourceManager have changed in health, + * availability etc. + */ + public void onNodesUpdated(List updatedNodes); + + public float getProgress(); + + public void onError(Exception e); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java new file mode 100644 index 0000000000..6b9b7ee3bf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.async; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; +import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +/** + * NMClientAsync handles communication with all the NodeManagers + * and provides asynchronous updates on getting responses from them. It + * maintains a thread pool to communicate with individual NMs where a number of + * worker threads process requests to NMs by using {@link NMClientImpl}. The max + * size of the thread pool is configurable through + * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}. + * + * It should be used in conjunction with a CallbackHandler. For example + * + *
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ *   public void onContainerStarted(ContainerId containerId,
+ *       Map allServiceResponse) {
+ *     [post process after the container is started, process the response]
+ *   }
+ *
+ *   public void onContainerStatusReceived(ContainerId containerId,
+ *       ContainerStatus containerStatus) {
+ *     [make use of the status of the container]
+ *   }
+ *
+ *   public void onContainerStopped(ContainerId containerId) {
+ *     [post process after the container is stopped]
+ *   }
+ *
+ *   public void onStartContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onGetContainerStatusError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onStopContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ * }
+ * }
+ * 
+ * + * The client's life-cycle should be managed like the following: + * + *
+ * {@code
+ * NMClientAsync asyncClient = 
+ *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ * 
+ */ +@Public +@Stable +public abstract class NMClientAsync extends AbstractService { + + protected NMClient client; + protected CallbackHandler callbackHandler; + + public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) { + return new NMClientAsyncImpl(callbackHandler); + } + + protected NMClientAsync(CallbackHandler callbackHandler) { + this (NMClientAsync.class.getName(), callbackHandler); + } + + protected NMClientAsync(String name, CallbackHandler callbackHandler) { + this (name, new NMClientImpl(), callbackHandler); + } + + @Private + @VisibleForTesting + protected NMClientAsync(String name, NMClient client, + CallbackHandler callbackHandler) { + super(name); + this.setClient(client); + this.setCallbackHandler(callbackHandler); + } + + public abstract void startContainerAsync( + Container container, ContainerLaunchContext containerLaunchContext); + + public abstract void stopContainerAsync( + ContainerId containerId, NodeId nodeId, Token containerToken); + + public abstract void getContainerStatusAsync( + ContainerId containerId, NodeId nodeId, Token containerToken); + + public NMClient getClient() { + return client; + } + + public void setClient(NMClient client) { + this.client = client; + } + + public CallbackHandler getCallbackHandler() { + return callbackHandler; + } + + public void setCallbackHandler(CallbackHandler callbackHandler) { + this.callbackHandler = callbackHandler; + } + + /** + *

+ * The callback interface needs to be implemented by {@link NMClientAsync} + * users. The APIs are called when responses from NodeManager are + * available. + *

+ * + *

+ * Once a callback happens, the users can chose to act on it in blocking or + * non-blocking manner. If the action on callback is done in a blocking + * manner, some of the threads performing requests on NodeManagers may get + * blocked depending on how many threads in the pool are busy. + *

+ * + *

+ * The implementation of the callback function should not throw the + * unexpected exception. Otherwise, {@link NMClientAsync} will just + * catch, log and then ignore it. + *

+ */ + public static interface CallbackHandler { + /** + * The API is called when NodeManager responds to indicate its + * acceptance of the starting container request + * @param containerId the Id of the container + * @param allServiceResponse a Map between the auxiliary service names and + * their outputs + */ + void onContainerStarted(ContainerId containerId, + Map allServiceResponse); + + /** + * The API is called when NodeManager responds with the status + * of the container + * @param containerId the Id of the container + * @param containerStatus the status of the container + */ + void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus); + + /** + * The API is called when NodeManager responds to indicate the + * container is stopped. + * @param containerId the Id of the container + */ + void onContainerStopped(ContainerId containerId); + + /** + * The API is called when an exception is raised in the process of + * starting a container + * + * @param containerId the Id of the container + * @param t the raised exception + */ + void onStartContainerError(ContainerId containerId, Throwable t); + + /** + * The API is called when an exception is raised in the process of + * querying the status of a container + * + * @param containerId the Id of the container + * @param t the raised exception + */ + void onGetContainerStatusError(ContainerId containerId, Throwable t); + + /** + * The API is called when an exception is raised in the process of + * stopping a container + * + * @param containerId the Id of the container + * @param t the raised exception + */ + void onStopContainerError(ContainerId containerId, Throwable t); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java similarity index 76% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 09918ddd43..c188231d0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.async.impl; import java.io.IOException; import java.util.Collection; @@ -24,15 +24,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -44,65 +41,24 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; -/** - * AMRMClientAsync handles communication with the ResourceManager - * and provides asynchronous updates on events such as container allocations and - * completions. It contains a thread that sends periodic heartbeats to the - * ResourceManager. - * - * It should be used by implementing a CallbackHandler: - *
- * {@code
- * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
- *   public void onContainersAllocated(List containers) {
- *     [run tasks on the containers]
- *   }
- *   
- *   public void onContainersCompleted(List statuses) {
- *     [update progress, check whether app is done]
- *   }
- *   
- *   public void onNodesUpdated(List updated) {}
- *   
- *   public void onReboot() {}
- * }
- * }
- * 
- * - * The client's lifecycle should be managed similarly to the following: - * - *
- * {@code
- * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * RegisterApplicationMasterResponse response = asyncClient
- *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- *       appMasterTrackingUrl);
- * asyncClient.addContainerRequest(containerRequest);
- * [... wait for application to complete]
- * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
- * asyncClient.stop();
- * }
- * 
- */ +@Private @Unstable -@Evolving -public class AMRMClientAsync extends AbstractService { +public class AMRMClientAsyncImpl +extends AMRMClientAsync { - private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); + private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class); - private final AMRMClient client; - private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); private final HeartbeatThread heartbeatThread; private final CallbackHandlerThread handlerThread; - private final CallbackHandler handler; private final BlockingQueue responseQueue; @@ -113,19 +69,16 @@ public class AMRMClientAsync extends AbstractService private volatile Exception savedException; - public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, + public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs, CallbackHandler callbackHandler) { this(new AMRMClientImpl(id), intervalMs, callbackHandler); } @Private @VisibleForTesting - protected AMRMClientAsync(AMRMClient client, int intervalMs, + public AMRMClientAsyncImpl(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) { - super(AMRMClientAsync.class.getName()); - this.client = client; - this.heartbeatIntervalMs.set(intervalMs); - handler = callbackHandler; + super(client, intervalMs, callbackHandler); heartbeatThread = new HeartbeatThread(); handlerThread = new CallbackHandlerThread(); responseQueue = new LinkedBlockingQueue(); @@ -386,38 +339,4 @@ public void run() { } } } - - public interface CallbackHandler { - - /** - * Called when the ResourceManager responds to a heartbeat with completed - * containers. If the response contains both completed containers and - * allocated containers, this will be called before containersAllocated. - */ - public void onContainersCompleted(List statuses); - - /** - * Called when the ResourceManager responds to a heartbeat with allocated - * containers. If the response containers both completed containers and - * allocated containers, this will be called after containersCompleted. - */ - public void onContainersAllocated(List containers); - - /** - * Called when the ResourceManager wants the ApplicationMaster to shutdown - * for being out of sync etc. The ApplicationMaster should not unregister - * with the RM unless the ApplicationMaster wants to be the last attempt. - */ - public void onShutdownRequest(); - - /** - * Called when nodes tracked by the ResourceManager have changed in health, - * availability etc. - */ - public void onNodesUpdated(List updatedNodes); - - public float getProgress(); - - public void onError(Exception e); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java similarity index 76% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 3b8fda2f73..0622b2d08c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.async.impl; import java.io.IOException; import java.nio.ByteBuffer; @@ -39,16 +39,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.event.EventHandler; @@ -63,75 +64,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -/** - * NMClientAsync handles communication with all the NodeManagers - * and provides asynchronous updates on getting responses from them. It - * maintains a thread pool to communicate with individual NMs where a number of - * worker threads process requests to NMs by using {@link NMClientImpl}. The max - * size of the thread pool is configurable through - * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}. - * - * It should be used in conjunction with a CallbackHandler. For example - * - *
- * {@code
- * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
- *   public void onContainerStarted(ContainerId containerId,
- *       Map allServiceResponse) {
- *     [post process after the container is started, process the response]
- *   }
- *
- *   public void onContainerStatusReceived(ContainerId containerId,
- *       ContainerStatus containerStatus) {
- *     [make use of the status of the container]
- *   }
- *
- *   public void onContainerStopped(ContainerId containerId) {
- *     [post process after the container is stopped]
- *   }
- *
- *   public void onStartContainerError(
- *       ContainerId containerId, Throwable t) {
- *     [handle the raised exception]
- *   }
- *
- *   public void onGetContainerStatusError(
- *       ContainerId containerId, Throwable t) {
- *     [handle the raised exception]
- *   }
- *
- *   public void onStopContainerError(
- *       ContainerId containerId, Throwable t) {
- *     [handle the raised exception]
- *   }
- * }
- * }
- * 
- * - * The client's life-cycle should be managed like the following: - * - *
- * {@code
- * NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * asyncClient.startContainer(container, containerLaunchContext);
- * [... wait for container being started]
- * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
- *     container.getContainerToken());
- * [... handle the status in the callback instance]
- * asyncClient.stopContainer(container.getId(), container.getNodeId(),
- *     container.getContainerToken());
- * [... wait for container being stopped]
- * asyncClient.stop();
- * }
- * 
- */ +@Private @Unstable -@Evolving -public class NMClientAsync extends AbstractService { +public class NMClientAsyncImpl extends NMClientAsync { - private static final Log LOG = LogFactory.getLog(NMClientAsync.class); + private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class); protected static final int INITIAL_THREAD_POOL_SIZE = 10; @@ -142,25 +79,22 @@ public class NMClientAsync extends AbstractService { protected BlockingQueue events = new LinkedBlockingQueue(); - protected NMClient client; - protected CallbackHandler callbackHandler; - protected ConcurrentMap containers = new ConcurrentHashMap(); - public NMClientAsync(CallbackHandler callbackHandler) { - this (NMClientAsync.class.getName(), callbackHandler); + public NMClientAsyncImpl(CallbackHandler callbackHandler) { + this (NMClientAsyncImpl.class.getName(), callbackHandler); } - public NMClientAsync(String name, CallbackHandler callbackHandler) { + public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { this (name, new NMClientImpl(), callbackHandler); } @Private @VisibleForTesting - protected NMClientAsync(String name, NMClient client, + protected NMClientAsyncImpl(String name, NMClient client, CallbackHandler callbackHandler) { - super(name); + super(name, client, callbackHandler); this.client = client; this.callbackHandler = callbackHandler; } @@ -268,7 +202,7 @@ protected void serviceStop() throws Exception { // If NMClientImpl doesn't stop running containers, the states doesn't // need to be cleared. if (!(client instanceof NMClientImpl) || - ((NMClientImpl) client).cleanupRunningContainers.get()) { + ((NMClientImpl) client).getCleanupRunningContainers().get()) { if (containers != null) { containers.clear(); } @@ -278,7 +212,7 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - public void startContainer( + public void startContainerAsync( Container container, ContainerLaunchContext containerLaunchContext) { if (containers.putIfAbsent(container.getId(), new StatefulContainer(this, container.getId())) != null) { @@ -295,7 +229,7 @@ public void startContainer( } } - public void stopContainer(ContainerId containerId, NodeId nodeId, + public void stopContainerAsync(ContainerId containerId, NodeId nodeId, Token containerToken) { if (containers.get(containerId) == null) { callbackHandler.onStopContainerError(containerId, @@ -312,7 +246,7 @@ public void stopContainer(ContainerId containerId, NodeId nodeId, } } - public void getContainerStatus(ContainerId containerId, NodeId nodeId, + public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId, Token containerToken) { try { events.put(new ContainerEvent(containerId, nodeId, containerToken, @@ -443,10 +377,10 @@ public ContainerState transition( } assert scEvent != null; Map allServiceResponse = - container.nmClientAsync.client.startContainer( + container.nmClientAsync.getClient().startContainer( scEvent.getContainer(), scEvent.getContainerLaunchContext()); try { - container.nmClientAsync.callbackHandler.onContainerStarted( + container.nmClientAsync.getCallbackHandler().onContainerStarted( containerId, allServiceResponse); } catch (Throwable thr) { // Don't process user created unchecked exception @@ -466,7 +400,7 @@ public ContainerState transition( private ContainerState onExceptionRaised(StatefulContainer container, ContainerEvent event, Throwable t) { try { - container.nmClientAsync.callbackHandler.onStartContainerError( + container.nmClientAsync.getCallbackHandler().onStartContainerError( event.getContainerId(), t); } catch (Throwable thr) { // Don't process user created unchecked exception @@ -487,10 +421,10 @@ public ContainerState transition( StatefulContainer container, ContainerEvent event) { ContainerId containerId = event.getContainerId(); try { - container.nmClientAsync.client.stopContainer( + container.nmClientAsync.getClient().stopContainer( containerId, event.getNodeId(), event.getContainerToken()); try { - container.nmClientAsync.callbackHandler.onContainerStopped( + container.nmClientAsync.getCallbackHandler().onContainerStopped( event.getContainerId()); } catch (Throwable thr) { // Don't process user created unchecked exception @@ -510,7 +444,7 @@ public ContainerState transition( private ContainerState onExceptionRaised(StatefulContainer container, ContainerEvent event, Throwable t) { try { - container.nmClientAsync.callbackHandler.onStopContainerError( + container.nmClientAsync.getCallbackHandler().onStopContainerError( event.getContainerId(), t); } catch (Throwable thr) { // Don't process user created unchecked exception @@ -530,7 +464,7 @@ protected static class OutOfOrderTransition implements @Override public void transition(StatefulContainer container, ContainerEvent event) { try { - container.nmClientAsync.callbackHandler.onStartContainerError( + container.nmClientAsync.getCallbackHandler().onStartContainerError( event.getContainerId(), RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG)); } catch (Throwable thr) { @@ -641,80 +575,4 @@ private void onExceptionRaised(ContainerId containerId, Throwable t) { } } - /** - *

- * The callback interface needs to be implemented by {@link NMClientAsync} - * users. The APIs are called when responses from NodeManager are - * available. - *

- * - *

- * Once a callback happens, the users can chose to act on it in blocking or - * non-blocking manner. If the action on callback is done in a blocking - * manner, some of the threads performing requests on NodeManagers may get - * blocked depending on how many threads in the pool are busy. - *

- * - *

- * The implementation of the callback function should not throw the - * unexpected exception. Otherwise, {@link NMClientAsync} will just - * catch, log and then ignore it. - *

- */ - public static interface CallbackHandler { - /** - * The API is called when NodeManager responds to indicate its - * acceptance of the starting container request - * @param containerId the Id of the container - * @param allServiceResponse a Map between the auxiliary service names and - * their outputs - */ - void onContainerStarted(ContainerId containerId, - Map allServiceResponse); - - /** - * The API is called when NodeManager responds with the status - * of the container - * @param containerId the Id of the container - * @param containerStatus the status of the container - */ - void onContainerStatusReceived(ContainerId containerId, - ContainerStatus containerStatus); - - /** - * The API is called when NodeManager responds to indicate the - * container is stopped. - * @param containerId the Id of the container - */ - void onContainerStopped(ContainerId containerId); - - /** - * The API is called when an exception is raised in the process of - * starting a container - * - * @param containerId the Id of the container - * @param t the raised exception - */ - void onStartContainerError(ContainerId containerId, Throwable t); - - /** - * The API is called when an exception is raised in the process of - * querying the status of a container - * - * @param containerId the Id of the container - * @param t the raised exception - */ - void onGetContainerStatusError(ContainerId containerId, Throwable t); - - /** - * The API is called when an exception is raised in the process of - * stopping a container - * - * @param containerId the Id of the container - * @param t the raised exception - */ - void onStopContainerError(ContainerId containerId, Throwable t); - - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java new file mode 100644 index 0000000000..b9c7d5ef44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.api.async.impl; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java new file mode 100644 index 0000000000..9a40b9ad29 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.api.async; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java similarity index 92% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index c24b6fa4f5..a61ae0c967 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; import java.net.InetSocketAddress; @@ -57,7 +57,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -71,6 +72,7 @@ // TODO check inputs for null etc. YARN-654 +@Private @Unstable public class AMRMClientImpl extends AMRMClient { @@ -312,64 +314,64 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, @Override public synchronized void addContainerRequest(T req) { Set allRacks = new HashSet(); - if (req.racks != null) { - allRacks.addAll(req.racks); - if(req.racks.size() != allRacks.size()) { + if (req.getRacks() != null) { + allRacks.addAll(req.getRacks()); + if(req.getRacks().size() != allRacks.size()) { Joiner joiner = Joiner.on(','); LOG.warn("ContainerRequest has duplicate racks: " - + joiner.join(req.racks)); + + joiner.join(req.getRacks())); } } - allRacks.addAll(resolveRacks(req.nodes)); + allRacks.addAll(resolveRacks(req.getNodes())); - if (req.nodes != null) { - HashSet dedupedNodes = new HashSet(req.nodes); - if(dedupedNodes.size() != req.nodes.size()) { + if (req.getNodes() != null) { + HashSet dedupedNodes = new HashSet(req.getNodes()); + if(dedupedNodes.size() != req.getNodes().size()) { Joiner joiner = Joiner.on(','); LOG.warn("ContainerRequest has duplicate nodes: " - + joiner.join(req.nodes)); + + joiner.join(req.getNodes())); } for (String node : dedupedNodes) { // Ensure node requests are accompanied by requests for // corresponding rack - addResourceRequest(req.priority, node, req.capability, - req.containerCount, req); + addResourceRequest(req.getPriority(), node, req.getCapability(), + req.getContainerCount(), req); } } for (String rack : allRacks) { - addResourceRequest(req.priority, rack, req.capability, - req.containerCount, req); + addResourceRequest(req.getPriority(), rack, req.getCapability(), + req.getContainerCount(), req); } // Off-switch - addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.containerCount, req); + addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(), + req.getContainerCount(), req); } @Override public synchronized void removeContainerRequest(T req) { Set allRacks = new HashSet(); - if (req.racks != null) { - allRacks.addAll(req.racks); + if (req.getRacks() != null) { + allRacks.addAll(req.getRacks()); } - allRacks.addAll(resolveRacks(req.nodes)); + allRacks.addAll(resolveRacks(req.getNodes())); // Update resource requests - if (req.nodes != null) { - for (String node : new HashSet(req.nodes)) { - decResourceRequest(req.priority, node, req.capability, - req.containerCount, req); + if (req.getNodes() != null) { + for (String node : new HashSet(req.getNodes())) { + decResourceRequest(req.getPriority(), node, req.getCapability(), + req.getContainerCount(), req); } } for (String rack : allRacks) { - decResourceRequest(req.priority, rack, req.capability, - req.containerCount, req); + decResourceRequest(req.getPriority(), rack, req.getCapability(), + req.getContainerCount(), req); } - decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.containerCount, req); + decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(), + req.getContainerCount(), req); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 042af516cf..1d9f128daa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; import java.net.InetSocketAddress; @@ -29,6 +29,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -76,6 +79,8 @@ * {@link #stopContainer}. *

*/ +@Private +@Unstable public class NMClientImpl extends NMClient { private static final Log LOG = LogFactory.getLog(NMClientImpl.class); @@ -86,7 +91,7 @@ public class NMClientImpl extends NMClient { new ConcurrentHashMap(); //enabled by default - protected final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); + private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); public NMClientImpl() { super(NMClientImpl.class.getName()); @@ -100,7 +105,7 @@ public NMClientImpl(String name) { protected void serviceStop() throws Exception { // Usually, started-containers are stopped when this client stops. Unless // the flag cleanupRunningContainers is set to false. - if (cleanupRunningContainers.get()) { + if (getCleanupRunningContainers().get()) { cleanupRunningContainers(); } super.serviceStop(); @@ -126,7 +131,7 @@ protected synchronized void cleanupRunningContainers() { @Override public void cleanupRunningContainersOnStop(boolean enabled) { - cleanupRunningContainers.set(enabled); + getCleanupRunningContainers().set(enabled); } protected static class StartedContainer { @@ -171,7 +176,7 @@ public NMCommunicator(ContainerId containerId, NodeId nodeId, } @Override - protected void serviceStart() throws Exception { + protected synchronized void serviceStart() throws Exception { final YarnRPC rpc = YarnRPC.create(getConfig()); final InetSocketAddress containerAddress = @@ -199,7 +204,7 @@ public ContainerManagementProtocol run() { } @Override - protected void serviceStop() throws Exception { + protected synchronized void serviceStop() throws Exception { if (this.containerManager != null) { RPC.stopProxy(this.containerManager); @@ -397,4 +402,8 @@ protected synchronized StartedContainer getStartedContainer( return startedContainers.get(containerId); } + public AtomicBoolean getCleanupRunningContainers() { + return cleanupRunningContainers; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java similarity index 96% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index fc41bac859..3e1d579346 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; import java.net.InetSocketAddress; @@ -25,8 +25,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; @@ -56,13 +56,16 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.Records; -@InterfaceAudience.Public -@InterfaceStability.Evolving +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable public class YarnClientImpl extends YarnClient { private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); @@ -304,4 +307,10 @@ private void getChildQueues(QueueInfo parent, List queues, } } } + + @Private + @VisibleForTesting + public void setRMClient(ApplicationClientProtocol rmClient) { + this.rmClient = rmClient; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java new file mode 100644 index 0000000000..6d5fb76150 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.api.impl; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java new file mode 100644 index 0000000000..32084f37ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.api; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 6bcd804f8a..312aab2513 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -27,12 +27,16 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +@Private +@Unstable public class ApplicationCLI extends YarnCLI { private static final String APPLICATIONS_PATTERN = "%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java index d8c05a8b21..1de45ed96a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java @@ -28,12 +28,16 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.lang.time.DateFormatUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +@Private +@Unstable public class NodeCLI extends YarnCLI { private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" + System.getProperty("line.separator"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 060ff4bc1f..3ea76168fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -23,6 +23,8 @@ import java.security.PrivilegedAction; import java.util.Arrays; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.ipc.RemoteException; @@ -42,6 +44,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +@Private +@Unstable public class RMAdminCLI extends Configured implements Tool { private final RecordFactory recordFactory = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java index aa7cb8d519..5f86033e65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java @@ -19,12 +19,15 @@ import java.io.PrintStream; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; -import org.apache.hadoop.yarn.client.YarnClient; -import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +@Private +@Unstable public abstract class YarnCLI extends Configured implements Tool { public static final String STATUS_CMD = "status"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java new file mode 100644 index 0000000000..85cfd7aca9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.cli; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index dd8a1c9421..6c1fcb2ad4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.async.impl; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; @@ -46,7 +46,10 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -105,7 +108,7 @@ public Resource answer(InvocationOnMock invocation) }); AMRMClientAsync asyncClient = - new AMRMClientAsync(client, 20, callbackHandler); + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); asyncClient.registerApplicationMaster("localhost", 1234, null); @@ -160,7 +163,7 @@ public void testAMRMClientAsyncException() throws Exception { when(client.allocate(anyFloat())).thenThrow(mockException); AMRMClientAsync asyncClient = - new AMRMClientAsync(client, 20, callbackHandler); + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); @@ -195,7 +198,7 @@ public void testAMRMClientAsyncReboot() throws Exception { when(client.allocate(anyFloat())).thenReturn(rebootResponse); AMRMClientAsync asyncClient = - new AMRMClientAsync(client, 20, callbackHandler); + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java index 4c9c124438..4f659482b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.async.impl; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; @@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -62,7 +65,7 @@ public class TestNMClientAsync { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private NMClientAsync asyncClient; + private NMClientAsyncImpl asyncClient; private NodeId nodeId; private Token containerToken; @@ -71,7 +74,7 @@ public void teardown() { ServiceOperations.stop(asyncClient); } - @Test (timeout = 30000) + @Test (timeout = 10000) public void testNMClientAsync() throws Exception { Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10); @@ -89,40 +92,42 @@ public void testNMClientAsync() throws Exception { for (int i = 0; i < expectedSuccess + expectedFailure; ++i) { if (i == expectedSuccess) { - while (!((TestCallbackHandler1) asyncClient.callbackHandler) + while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) .isAllSuccessCallsExecuted()) { Thread.sleep(10); } - asyncClient.client = mockNMClient(1); + asyncClient.setClient(mockNMClient(1)); } Container container = mockContainer(i); ContainerLaunchContext clc = recordFactory.newRecordInstance(ContainerLaunchContext.class); - asyncClient.startContainer(container, clc); + asyncClient.startContainerAsync(container, clc); } - while (!((TestCallbackHandler1) asyncClient.callbackHandler) + while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) .isStartAndQueryFailureCallsExecuted()) { Thread.sleep(10); } - asyncClient.client = mockNMClient(2); - ((TestCallbackHandler1) asyncClient.callbackHandler).path = false; + asyncClient.setClient(mockNMClient(2)); + ((TestCallbackHandler1) asyncClient.getCallbackHandler()).path = false; for (int i = 0; i < expectedFailure; ++i) { Container container = mockContainer( expectedSuccess + expectedFailure + i); ContainerLaunchContext clc = recordFactory.newRecordInstance(ContainerLaunchContext.class); - asyncClient.startContainer(container, clc); + asyncClient.startContainerAsync(container, clc); } - while (!((TestCallbackHandler1) asyncClient.callbackHandler) + while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) .isStopFailureCallsExecuted()) { Thread.sleep(10); } for (String errorMsg : - ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) { + ((TestCallbackHandler1) asyncClient.getCallbackHandler()) + .errorMsgs) { System.out.println(errorMsg); } Assert.assertEquals("Error occurs in CallbackHandler", 0, - ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size()); + ((TestCallbackHandler1) asyncClient.getCallbackHandler()) + .errorMsgs.size()); for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) { System.out.println(errorMsg); } @@ -141,7 +146,7 @@ public void testNMClientAsync() throws Exception { asyncClient.threadPool.isShutdown()); } - private class MockNMClientAsync1 extends NMClientAsync { + private class MockNMClientAsync1 extends NMClientAsyncImpl { private Set errorMsgs = Collections.synchronizedSet(new HashSet()); @@ -227,10 +232,10 @@ public void onContainerStarted(ContainerId containerId, actualStartSuccessArray.set(containerId.getId(), 1); // move on to the following success tests - asyncClient.getContainerStatus(containerId, nodeId, containerToken); + asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken); } else { // move on to the following failure tests - asyncClient.stopContainer(containerId, nodeId, containerToken); + asyncClient.stopContainerAsync(containerId, nodeId, containerToken); } // Shouldn't crash the test thread @@ -248,7 +253,7 @@ public void onContainerStatusReceived(ContainerId containerId, actualQuerySuccess.addAndGet(1); actualQuerySuccessArray.set(containerId.getId(), 1); // move on to the following success tests - asyncClient.stopContainer(containerId, nodeId, containerToken); + asyncClient.stopContainerAsync(containerId, nodeId, containerToken); // Shouldn't crash the test thread throw new RuntimeException("Ignorable Exception"); @@ -285,7 +290,7 @@ public void onStartContainerError(ContainerId containerId, Throwable t) { actualStartFailure.addAndGet(1); actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1); // move on to the following failure tests - asyncClient.getContainerStatus(containerId, nodeId, containerToken); + asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken); // Shouldn't crash the test thread throw new RuntimeException("Ignorable Exception"); @@ -426,22 +431,22 @@ public void testOutOfOrder() throws Exception { Thread t = new Thread() { @Override public void run() { - asyncClient.startContainer(container, clc); + asyncClient.startContainerAsync(container, clc); } }; t.start(); barrierA.await(); - asyncClient.stopContainer(container.getId(), container.getNodeId(), + asyncClient.stopContainerAsync(container.getId(), container.getNodeId(), container.getContainerToken()); barrierC.await(); Assert.assertFalse("Starting and stopping should be out of order", - ((TestCallbackHandler2) asyncClient.callbackHandler) + ((TestCallbackHandler2) asyncClient.getCallbackHandler()) .exceptionOccurred.get()); } - private class MockNMClientAsync2 extends NMClientAsync { + private class MockNMClientAsync2 extends NMClientAsyncImpl { private CyclicBarrier barrierA; private CyclicBarrier barrierB; @@ -510,7 +515,7 @@ public void onContainerStopped(ContainerId containerId) { @Override public void onStartContainerError(ContainerId containerId, Throwable t) { - if (!t.getMessage().equals(NMClientAsync.StatefulContainer + if (!t.getMessage().equals(NMClientAsyncImpl.StatefulContainer .OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) { exceptionOccurred.set(true); return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java similarity index 98% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 84a7252bf2..9398f984d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -57,8 +57,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index ab2de5bc93..735b8edaa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import java.util.Arrays; import java.util.List; @@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.junit.Test; -import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest; import static org.junit.Assert.assertEquals; public class TestAMRMClientContainerRequest { @@ -72,8 +73,8 @@ public void reloadCachedMappings() {} private void verifyResourceRequestLocation( AMRMClientImpl client, ContainerRequest request, String location) { - ResourceRequest ask = client.remoteRequestsTable.get(request.priority) - .get(location).get(request.capability).remoteRequest; + ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) + .get(location).get(request.getCapability()).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(request.getContainerCount(), ask.getNumContainers()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index de4546123d..232903b4ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -51,7 +51,13 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -164,9 +170,9 @@ private void stopNmClient(boolean stopContainers) { // leave one unclosed assertEquals(1, nmClient.startedContainers.size()); // default true - assertTrue(nmClient.cleanupRunningContainers.get()); + assertTrue(nmClient.getCleanupRunningContainers().get()); nmClient.cleanupRunningContainersOnStop(stopContainers); - assertEquals(stopContainers, nmClient.cleanupRunningContainers.get()); + assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get()); nmClient.stop(); } @@ -201,7 +207,7 @@ public void testNMClient() // stop the running containers on close assertFalse(nmClient.startedContainers.isEmpty()); nmClient.cleanupRunningContainersOnStop(true); - assertTrue(nmClient.cleanupRunningContainers.get()); + assertTrue(nmClient.getCleanupRunningContainers().get()); nmClient.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 768371be0b..1c9b750558 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.api.impl; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 49d7867767..365bc8e0bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.util.Records; import org.junit.Before; import org.junit.Test;