diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
index 349a019459..13b93014a3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
@@ -20,9 +20,12 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -40,34 +43,85 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ProtoUtils;
-public class ResourceMgrDelegate extends YarnClientImpl {
+import com.google.common.annotations.VisibleForTesting;
+
+public class ResourceMgrDelegate extends YarnClient {
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
private YarnConfiguration conf;
private GetNewApplicationResponse application;
private ApplicationId applicationId;
+ @Private
+ @VisibleForTesting
+ protected YarnClient client;
+ private InetSocketAddress rmAddress;
/**
- * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
+ * Delegate responsible for communicating with the Resource Manager's
+ * {@link ApplicationClientProtocol}.
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) {
- super();
+ this(conf, null);
+ }
+
+ /**
+ * Delegate responsible for communicating with the Resource Manager's
+ * {@link ApplicationClientProtocol}.
+ * @param conf the configuration object.
+ * @param rmAddress the address of the Resource Manager
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf,
+ InetSocketAddress rmAddress) {
+ super(ResourceMgrDelegate.class.getName());
this.conf = conf;
+ this.rmAddress = rmAddress;
+ if (rmAddress == null) {
+ client = YarnClient.createYarnClient();
+ } else {
+ client = YarnClient.createYarnClient(rmAddress);
+ }
init(conf);
start();
}
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (rmAddress == null) {
+ this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ }
+ client.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ client.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ client.stop();
+ super.serviceStop();
+ }
+
public TaskTrackerInfo[] getActiveTrackers() throws IOException,
InterruptedException {
try {
- return TypeConverter.fromYarnNodes(super.getNodeReports());
+ return TypeConverter.fromYarnNodes(client.getNodeReports());
} catch (YarnException e) {
throw new IOException(e);
}
@@ -75,7 +129,7 @@ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
try {
- return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
+ return TypeConverter.fromYarnApps(client.getApplicationList(), this.conf);
} catch (YarnException e) {
throw new IOException(e);
}
@@ -91,7 +145,7 @@ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
try {
- YarnClusterMetrics metrics = super.getYarnClusterMetrics();
+ YarnClusterMetrics metrics = client.getYarnClusterMetrics();
ClusterMetrics oldMetrics =
new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10,
@@ -112,7 +166,7 @@ public Token getDelegationToken(Text renewer) throws IOException,
InterruptedException {
try {
return ProtoUtils.convertFromProtoFormat(
- super.getRMDelegationToken(renewer), rmAddress);
+ client.getRMDelegationToken(renewer), rmAddress);
} catch (YarnException e) {
throw new IOException(e);
}
@@ -124,7 +178,7 @@ public String getFilesystemName() throws IOException, InterruptedException {
public JobID getNewJobID() throws IOException, InterruptedException {
try {
- this.application = super.getNewApplication();
+ this.application = client.getNewApplication();
this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
} catch (YarnException e) {
@@ -136,7 +190,7 @@ public QueueInfo getQueue(String queueName) throws IOException,
InterruptedException {
try {
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
- super.getQueueInfo(queueName);
+ client.getQueueInfo(queueName);
return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo,
conf);
} catch (YarnException e) {
@@ -147,7 +201,7 @@ public QueueInfo getQueue(String queueName) throws IOException,
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
InterruptedException {
try {
- return TypeConverter.fromYarnQueueUserAclsInfo(super
+ return TypeConverter.fromYarnQueueUserAclsInfo(client
.getQueueAclsInfo());
} catch (YarnException e) {
throw new IOException(e);
@@ -156,7 +210,7 @@ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
public QueueInfo[] getQueues() throws IOException, InterruptedException {
try {
- return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
+ return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
} catch (YarnException e) {
throw new IOException(e);
}
@@ -164,7 +218,7 @@ public QueueInfo[] getQueues() throws IOException, InterruptedException {
public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
try {
- return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(),
+ return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
this.conf);
} catch (YarnException e) {
throw new IOException(e);
@@ -174,7 +228,7 @@ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
public QueueInfo[] getChildQueues(String parent) throws IOException,
InterruptedException {
try {
- return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+ return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
this.conf);
} catch (YarnException e) {
throw new IOException(e);
@@ -216,4 +270,82 @@ public long getProtocolVersion(String arg0, long arg1) throws IOException {
public ApplicationId getApplicationId() {
return applicationId;
}
+
+ @Override
+ public GetNewApplicationResponse getNewApplication() throws YarnException,
+ IOException {
+ return client.getNewApplication();
+ }
+
+ @Override
+ public ApplicationId
+ submitApplication(ApplicationSubmissionContext appContext)
+ throws YarnException, IOException {
+ return client.submitApplication(appContext);
+ }
+
+ @Override
+ public void killApplication(ApplicationId applicationId)
+ throws YarnException, IOException {
+ client.killApplication(applicationId);
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport(ApplicationId appId)
+ throws YarnException, IOException {
+ return client.getApplicationReport(appId);
+ }
+
+ @Override
+ public List getApplicationList() throws YarnException,
+ IOException {
+ return client.getApplicationList();
+ }
+
+ @Override
+ public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
+ IOException {
+ return client.getYarnClusterMetrics();
+ }
+
+ @Override
+ public List getNodeReports() throws YarnException, IOException {
+ return client.getNodeReports();
+ }
+
+ @Override
+ public org.apache.hadoop.yarn.api.records.Token getRMDelegationToken(
+ Text renewer) throws YarnException, IOException {
+ return client.getRMDelegationToken(renewer);
+ }
+
+ @Override
+ public org.apache.hadoop.yarn.api.records.QueueInfo getQueueInfo(
+ String queueName) throws YarnException, IOException {
+ return client.getQueueInfo(queueName);
+ }
+
+ @Override
+ public List getAllQueues()
+ throws YarnException, IOException {
+ return client.getAllQueues();
+ }
+
+ @Override
+ public List getRootQueueInfos()
+ throws YarnException, IOException {
+ return client.getRootQueueInfos();
+ }
+
+ @Override
+ public List getChildQueueInfos(
+ String parent) throws YarnException, IOException {
+ return client.getChildQueueInfos(parent);
+ }
+
+ @Override
+ public List getQueueAclsInfo() throws YarnException,
+ IOException {
+ return client.getQueueAclsInfo();
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
index 816804d671..2339fb58d2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
@@ -67,8 +68,9 @@ public void testGetRootQueues() throws IOException, InterruptedException {
ResourceMgrDelegate delegate = new ResourceMgrDelegate(
new YarnConfiguration()) {
@Override
- protected void serviceStart() {
- this.rmClient = applicationsManager;
+ protected void serviceStart() throws Exception {
+ Assert.assertTrue(this.client instanceof YarnClientImpl);
+ ((YarnClientImpl) this.client).setRMClient(applicationsManager);
}
};
delegate.getRootQueues();
@@ -110,8 +112,9 @@ public void tesAllJobs() throws Exception {
ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
new YarnConfiguration()) {
@Override
- protected void serviceStart() {
- this.rmClient = applicationsManager;
+ protected void serviceStart() throws Exception {
+ Assert.assertTrue(this.client instanceof YarnClientImpl);
+ ((YarnClientImpl) this.client).setRMClient(applicationsManager);
}
};
JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index 0ccfcc306a..0046c9b330 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -82,6 +82,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -200,8 +201,9 @@ public void testResourceMgrDelegate() throws Exception {
final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class);
ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
@Override
- protected void serviceStart() {
- this.rmClient = clientRMProtocol;
+ protected void serviceStart() throws Exception {
+ assertTrue(this.client instanceof YarnClientImpl);
+ ((YarnClientImpl) this.client).setRMClient(clientRMProtocol);
}
};
/* make sure kill calls finish application master */
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
index 289d0f21e5..aeb20cd449 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -110,8 +111,9 @@ public void testClusterGetDelegationToken() throws Exception {
ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
new YarnConfiguration(conf)) {
@Override
- protected void serviceStart() {
- this.rmClient = cRMProtocol;
+ protected void serviceStart() throws Exception {
+ assertTrue(this.client instanceof YarnClientImpl);
+ ((YarnClientImpl) this.client).setRMClient(cRMProtocol);
}
};
yrunner.setResourceMgrDelegate(rmgrDelegate);
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index af11848864..991a30fec5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -186,6 +186,10 @@ Release 2.1.0-beta - UNRELEASED
YARN-610. ClientToken is no longer set in the environment of the Containers.
(Omkar Vinit Joshi via vinodkv)
+ YARN-834. Fixed annotations for yarn-client module, reorganized packages and
+ clearly differentiated *Async apis. (Arun C Murthy and Zhijie Shen via
+ vinodkv)
+
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 64b4d6f21c..6b7768bdc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -67,9 +67,9 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -436,17 +436,18 @@ private void printUsage(Options opts) {
* @throws YarnException
* @throws IOException
*/
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "unchecked" })
public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
- resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
+ resourceManager =
+ AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
containerListener = new NMCallbackHandler();
- nmClientAsync = new NMClientAsync(containerListener);
+ nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
@@ -682,7 +683,7 @@ public void onContainerStarted(ContainerId containerId,
}
Container container = containers.get(containerId);
if (container != null) {
- nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
+ nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
container.getContainerToken());
}
}
@@ -802,7 +803,7 @@ public void run() {
ctx.setCommands(commands);
containerListener.addContainer(container.getId(), container);
- nmClientAsync.startContainer(container, ctx);
+ nmClientAsync.startContainerAsync(container, ctx);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 6046612ed9..9e14ca4404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -60,7 +60,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
index 275df8df14..b209d95a73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
@@ -48,8 +48,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
similarity index 98%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 373cbb35d2..e497005470 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.util.Collection;
@@ -36,12 +36,13 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public
-@InterfaceStability.Unstable
+@InterfaceStability.Stable
public abstract class AMRMClient extends
AbstractService {
@@ -53,7 +54,7 @@ public abstract class AMRMClient extends
* AMRMClient.createAMRMClientContainerRequest(appAttemptId)
* }
* @param appAttemptId the appAttemptId associated with the AMRMClient
- * @return the newly created AMRMClient instance.
+ * @return the newly create AMRMClient instance.
*/
@Public
public static AMRMClient createAMRMClient(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
similarity index 97%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
index 9dd3ebda6b..e628745e0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,10 +33,11 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@InterfaceAudience.Public
-@InterfaceStability.Unstable
+@InterfaceStability.Stable
public abstract class NMClient extends AbstractService {
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
similarity index 98%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index 57ef8054fc..751c4ba809 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -37,10 +37,11 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
new file mode 100644
index 0000000000..d24750efbb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -0,0 +1,245 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * AMRMClientAsync
handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions. It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ *
+ * It should be used by implementing a CallbackHandler:
+ *
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ * public void onContainersAllocated(List containers) {
+ * [run tasks on the containers]
+ * }
+ *
+ * public void onContainersCompleted(List statuses) {
+ * [update progress, check whether app is done]
+ * }
+ *
+ * public void onNodesUpdated(List updated) {}
+ *
+ * public void onReboot() {}
+ * }
+ * }
+ *
+ *
+ * The client's lifecycle should be managed similarly to the following:
+ *
+ *
+ * {@code
+ * AMRMClientAsync asyncClient =
+ * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ * .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ * appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ *
+ */
+@Public
+@Stable
+public abstract class AMRMClientAsync
+extends AbstractService {
+
+ protected final AMRMClient client;
+ protected final CallbackHandler handler;
+ protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
+
+ public static AMRMClientAsync
+ createAMRMClientAsync(
+ ApplicationAttemptId id,
+ int intervalMs,
+ CallbackHandler callbackHandler) {
+ return new AMRMClientAsyncImpl(id, intervalMs, callbackHandler);
+ }
+
+ public static AMRMClientAsync
+ createAMRMClientAsync(
+ AMRMClient client,
+ int intervalMs,
+ CallbackHandler callbackHandler) {
+ return new AMRMClientAsyncImpl(client, intervalMs, callbackHandler);
+ }
+
+ protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+ CallbackHandler callbackHandler) {
+ this(new AMRMClientImpl(id), intervalMs, callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected AMRMClientAsync(AMRMClient client, int intervalMs,
+ CallbackHandler callbackHandler) {
+ super(AMRMClientAsync.class.getName());
+ this.client = client;
+ this.heartbeatIntervalMs.set(intervalMs);
+ this.handler = callbackHandler;
+ }
+
+ public void setHeartbeatInterval(int interval) {
+ heartbeatIntervalMs.set(interval);
+ }
+
+ public abstract List extends Collection> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability);
+
+ /**
+ * Registers this application master with the resource manager. On successful
+ * registration, starts the heartbeating thread.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnException, IOException;
+
+ /**
+ * Unregister the application master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract void unregisterApplicationMaster(
+ FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)
+ throws YarnException, IOException;
+
+ /**
+ * Request containers for resources before calling allocate
+ * @param req Resource request
+ */
+ public abstract void addContainerRequest(T req);
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public abstract void removeContainerRequest(T req);
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release them.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. eg. it released non-local resources
+ * @param containerId
+ */
+ public abstract void releaseAssignedContainer(ContainerId containerId);
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public abstract Resource getClusterAvailableResources();
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public abstract int getClusterNodeCount();
+
+ /**
+ * It returns the NMToken received on allocate call. It will not communicate
+ * with RM to get NMTokens. On allocate call whenever we receive new token
+ * along with new container AMRMClientAsync will cache this NMToken per node
+ * manager. This map returned should be shared with any application which is
+ * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+ * NMTokens. If a new NMToken is received for the same node manager
+ * then it will be replaced.
+ */
+ public abstract ConcurrentMap getNMTokens();
+
+ public interface CallbackHandler {
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with completed
+ * containers. If the response contains both completed containers and
+ * allocated containers, this will be called before containersAllocated.
+ */
+ public void onContainersCompleted(List statuses);
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with allocated
+ * containers. If the response containers both completed containers and
+ * allocated containers, this will be called after containersCompleted.
+ */
+ public void onContainersAllocated(List containers);
+
+ /**
+ * Called when the ResourceManager wants the ApplicationMaster to shutdown
+ * for being out of sync etc. The ApplicationMaster should not unregister
+ * with the RM unless the ApplicationMaster wants to be the last attempt.
+ */
+ public void onShutdownRequest();
+
+ /**
+ * Called when nodes tracked by the ResourceManager have changed in health,
+ * availability etc.
+ */
+ public void onNodesUpdated(List updatedNodes);
+
+ public float getProgress();
+
+ public void onError(Exception e);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
new file mode 100644
index 0000000000..6b9b7ee3bf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * NMClientAsync
handles communication with all the NodeManagers
+ * and provides asynchronous updates on getting responses from them. It
+ * maintains a thread pool to communicate with individual NMs where a number of
+ * worker threads process requests to NMs by using {@link NMClientImpl}. The max
+ * size of the thread pool is configurable through
+ * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
+ *
+ * It should be used in conjunction with a CallbackHandler. For example
+ *
+ *
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ * public void onContainerStarted(ContainerId containerId,
+ * Map allServiceResponse) {
+ * [post process after the container is started, process the response]
+ * }
+ *
+ * public void onContainerStatusReceived(ContainerId containerId,
+ * ContainerStatus containerStatus) {
+ * [make use of the status of the container]
+ * }
+ *
+ * public void onContainerStopped(ContainerId containerId) {
+ * [post process after the container is stopped]
+ * }
+ *
+ * public void onStartContainerError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ *
+ * public void onGetContainerStatusError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ *
+ * public void onStopContainerError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ * }
+ * }
+ *
+ *
+ * The client's life-cycle should be managed like the following:
+ *
+ *
+ * {@code
+ * NMClientAsync asyncClient =
+ * NMClientAsync.createNMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ * container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ * container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ *
+ */
+@Public
+@Stable
+public abstract class NMClientAsync extends AbstractService {
+
+ protected NMClient client;
+ protected CallbackHandler callbackHandler;
+
+ public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
+ return new NMClientAsyncImpl(callbackHandler);
+ }
+
+ protected NMClientAsync(CallbackHandler callbackHandler) {
+ this (NMClientAsync.class.getName(), callbackHandler);
+ }
+
+ protected NMClientAsync(String name, CallbackHandler callbackHandler) {
+ this (name, new NMClientImpl(), callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected NMClientAsync(String name, NMClient client,
+ CallbackHandler callbackHandler) {
+ super(name);
+ this.setClient(client);
+ this.setCallbackHandler(callbackHandler);
+ }
+
+ public abstract void startContainerAsync(
+ Container container, ContainerLaunchContext containerLaunchContext);
+
+ public abstract void stopContainerAsync(
+ ContainerId containerId, NodeId nodeId, Token containerToken);
+
+ public abstract void getContainerStatusAsync(
+ ContainerId containerId, NodeId nodeId, Token containerToken);
+
+ public NMClient getClient() {
+ return client;
+ }
+
+ public void setClient(NMClient client) {
+ this.client = client;
+ }
+
+ public CallbackHandler getCallbackHandler() {
+ return callbackHandler;
+ }
+
+ public void setCallbackHandler(CallbackHandler callbackHandler) {
+ this.callbackHandler = callbackHandler;
+ }
+
+ /**
+ *
+ * The callback interface needs to be implemented by {@link NMClientAsync}
+ * users. The APIs are called when responses from NodeManager
are
+ * available.
+ *
+ *
+ *
+ * Once a callback happens, the users can chose to act on it in blocking or
+ * non-blocking manner. If the action on callback is done in a blocking
+ * manner, some of the threads performing requests on NodeManagers may get
+ * blocked depending on how many threads in the pool are busy.
+ *
+ *
+ *
+ * The implementation of the callback function should not throw the
+ * unexpected exception. Otherwise, {@link NMClientAsync} will just
+ * catch, log and then ignore it.
+ *
+ */
+ public static interface CallbackHandler {
+ /**
+ * The API is called when NodeManager
responds to indicate its
+ * acceptance of the starting container request
+ * @param containerId the Id of the container
+ * @param allServiceResponse a Map between the auxiliary service names and
+ * their outputs
+ */
+ void onContainerStarted(ContainerId containerId,
+ Map allServiceResponse);
+
+ /**
+ * The API is called when NodeManager
responds with the status
+ * of the container
+ * @param containerId the Id of the container
+ * @param containerStatus the status of the container
+ */
+ void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus);
+
+ /**
+ * The API is called when NodeManager
responds to indicate the
+ * container is stopped.
+ * @param containerId the Id of the container
+ */
+ void onContainerStopped(ContainerId containerId);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * starting a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onStartContainerError(ContainerId containerId, Throwable t);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * querying the status of a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onGetContainerStatusError(ContainerId containerId, Throwable t);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * stopping a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onStopContainerError(ContainerId containerId, Throwable t);
+
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
similarity index 76%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 09918ddd43..c188231d0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
import java.io.IOException;
import java.util.Collection;
@@ -24,15 +24,12 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -44,65 +41,24 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
-/**
- * AMRMClientAsync
handles communication with the ResourceManager
- * and provides asynchronous updates on events such as container allocations and
- * completions. It contains a thread that sends periodic heartbeats to the
- * ResourceManager.
- *
- * It should be used by implementing a CallbackHandler:
- *
- * {@code
- * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
- * public void onContainersAllocated(List containers) {
- * [run tasks on the containers]
- * }
- *
- * public void onContainersCompleted(List statuses) {
- * [update progress, check whether app is done]
- * }
- *
- * public void onNodesUpdated(List updated) {}
- *
- * public void onReboot() {}
- * }
- * }
- *
- *
- * The client's lifecycle should be managed similarly to the following:
- *
- *
- * {@code
- * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * RegisterApplicationMasterResponse response = asyncClient
- * .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- * appMasterTrackingUrl);
- * asyncClient.addContainerRequest(containerRequest);
- * [... wait for application to complete]
- * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
- * asyncClient.stop();
- * }
- *
- */
+@Private
@Unstable
-@Evolving
-public class AMRMClientAsync extends AbstractService {
+public class AMRMClientAsyncImpl
+extends AMRMClientAsync {
- private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
+ private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class);
- private final AMRMClient client;
- private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread;
- private final CallbackHandler handler;
private final BlockingQueue responseQueue;
@@ -113,19 +69,16 @@ public class AMRMClientAsync extends AbstractService
private volatile Exception savedException;
- public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+ public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs,
CallbackHandler callbackHandler) {
this(new AMRMClientImpl(id), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
- protected AMRMClientAsync(AMRMClient client, int intervalMs,
+ public AMRMClientAsyncImpl(AMRMClient client, int intervalMs,
CallbackHandler callbackHandler) {
- super(AMRMClientAsync.class.getName());
- this.client = client;
- this.heartbeatIntervalMs.set(intervalMs);
- handler = callbackHandler;
+ super(client, intervalMs, callbackHandler);
heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue();
@@ -386,38 +339,4 @@ public void run() {
}
}
}
-
- public interface CallbackHandler {
-
- /**
- * Called when the ResourceManager responds to a heartbeat with completed
- * containers. If the response contains both completed containers and
- * allocated containers, this will be called before containersAllocated.
- */
- public void onContainersCompleted(List statuses);
-
- /**
- * Called when the ResourceManager responds to a heartbeat with allocated
- * containers. If the response containers both completed containers and
- * allocated containers, this will be called after containersCompleted.
- */
- public void onContainersAllocated(List containers);
-
- /**
- * Called when the ResourceManager wants the ApplicationMaster to shutdown
- * for being out of sync etc. The ApplicationMaster should not unregister
- * with the RM unless the ApplicationMaster wants to be the last attempt.
- */
- public void onShutdownRequest();
-
- /**
- * Called when nodes tracked by the ResourceManager have changed in health,
- * availability etc.
- */
- public void onNodesUpdated(List updatedNodes);
-
- public float getProgress();
-
- public void onError(Exception e);
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
similarity index 76%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
index 3b8fda2f73..0622b2d08c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -39,16 +39,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -63,75 +64,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-/**
- * NMClientAsync
handles communication with all the NodeManagers
- * and provides asynchronous updates on getting responses from them. It
- * maintains a thread pool to communicate with individual NMs where a number of
- * worker threads process requests to NMs by using {@link NMClientImpl}. The max
- * size of the thread pool is configurable through
- * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
- *
- * It should be used in conjunction with a CallbackHandler. For example
- *
- *
- * {@code
- * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
- * public void onContainerStarted(ContainerId containerId,
- * Map allServiceResponse) {
- * [post process after the container is started, process the response]
- * }
- *
- * public void onContainerStatusReceived(ContainerId containerId,
- * ContainerStatus containerStatus) {
- * [make use of the status of the container]
- * }
- *
- * public void onContainerStopped(ContainerId containerId) {
- * [post process after the container is stopped]
- * }
- *
- * public void onStartContainerError(
- * ContainerId containerId, Throwable t) {
- * [handle the raised exception]
- * }
- *
- * public void onGetContainerStatusError(
- * ContainerId containerId, Throwable t) {
- * [handle the raised exception]
- * }
- *
- * public void onStopContainerError(
- * ContainerId containerId, Throwable t) {
- * [handle the raised exception]
- * }
- * }
- * }
- *
- *
- * The client's life-cycle should be managed like the following:
- *
- *
- * {@code
- * NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * asyncClient.startContainer(container, containerLaunchContext);
- * [... wait for container being started]
- * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
- * container.getContainerToken());
- * [... handle the status in the callback instance]
- * asyncClient.stopContainer(container.getId(), container.getNodeId(),
- * container.getContainerToken());
- * [... wait for container being stopped]
- * asyncClient.stop();
- * }
- *
- */
+@Private
@Unstable
-@Evolving
-public class NMClientAsync extends AbstractService {
+public class NMClientAsyncImpl extends NMClientAsync {
- private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
+ private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
protected static final int INITIAL_THREAD_POOL_SIZE = 10;
@@ -142,25 +79,22 @@ public class NMClientAsync extends AbstractService {
protected BlockingQueue events =
new LinkedBlockingQueue();
- protected NMClient client;
- protected CallbackHandler callbackHandler;
-
protected ConcurrentMap containers =
new ConcurrentHashMap();
- public NMClientAsync(CallbackHandler callbackHandler) {
- this (NMClientAsync.class.getName(), callbackHandler);
+ public NMClientAsyncImpl(CallbackHandler callbackHandler) {
+ this (NMClientAsyncImpl.class.getName(), callbackHandler);
}
- public NMClientAsync(String name, CallbackHandler callbackHandler) {
+ public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
this (name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
- protected NMClientAsync(String name, NMClient client,
+ protected NMClientAsyncImpl(String name, NMClient client,
CallbackHandler callbackHandler) {
- super(name);
+ super(name, client, callbackHandler);
this.client = client;
this.callbackHandler = callbackHandler;
}
@@ -268,7 +202,7 @@ protected void serviceStop() throws Exception {
// If NMClientImpl doesn't stop running containers, the states doesn't
// need to be cleared.
if (!(client instanceof NMClientImpl) ||
- ((NMClientImpl) client).cleanupRunningContainers.get()) {
+ ((NMClientImpl) client).getCleanupRunningContainers().get()) {
if (containers != null) {
containers.clear();
}
@@ -278,7 +212,7 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
- public void startContainer(
+ public void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext) {
if (containers.putIfAbsent(container.getId(),
new StatefulContainer(this, container.getId())) != null) {
@@ -295,7 +229,7 @@ public void startContainer(
}
}
- public void stopContainer(ContainerId containerId, NodeId nodeId,
+ public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
Token containerToken) {
if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId,
@@ -312,7 +246,7 @@ public void stopContainer(ContainerId containerId, NodeId nodeId,
}
}
- public void getContainerStatus(ContainerId containerId, NodeId nodeId,
+ public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
Token containerToken) {
try {
events.put(new ContainerEvent(containerId, nodeId, containerToken,
@@ -443,10 +377,10 @@ public ContainerState transition(
}
assert scEvent != null;
Map allServiceResponse =
- container.nmClientAsync.client.startContainer(
+ container.nmClientAsync.getClient().startContainer(
scEvent.getContainer(), scEvent.getContainerLaunchContext());
try {
- container.nmClientAsync.callbackHandler.onContainerStarted(
+ container.nmClientAsync.getCallbackHandler().onContainerStarted(
containerId, allServiceResponse);
} catch (Throwable thr) {
// Don't process user created unchecked exception
@@ -466,7 +400,7 @@ public ContainerState transition(
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
- container.nmClientAsync.callbackHandler.onStartContainerError(
+ container.nmClientAsync.getCallbackHandler().onStartContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
@@ -487,10 +421,10 @@ public ContainerState transition(
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
- container.nmClientAsync.client.stopContainer(
+ container.nmClientAsync.getClient().stopContainer(
containerId, event.getNodeId(), event.getContainerToken());
try {
- container.nmClientAsync.callbackHandler.onContainerStopped(
+ container.nmClientAsync.getCallbackHandler().onContainerStopped(
event.getContainerId());
} catch (Throwable thr) {
// Don't process user created unchecked exception
@@ -510,7 +444,7 @@ public ContainerState transition(
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
- container.nmClientAsync.callbackHandler.onStopContainerError(
+ container.nmClientAsync.getCallbackHandler().onStopContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
@@ -530,7 +464,7 @@ protected static class OutOfOrderTransition implements
@Override
public void transition(StatefulContainer container, ContainerEvent event) {
try {
- container.nmClientAsync.callbackHandler.onStartContainerError(
+ container.nmClientAsync.getCallbackHandler().onStartContainerError(
event.getContainerId(),
RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
} catch (Throwable thr) {
@@ -641,80 +575,4 @@ private void onExceptionRaised(ContainerId containerId, Throwable t) {
}
}
- /**
- *
- * The callback interface needs to be implemented by {@link NMClientAsync}
- * users. The APIs are called when responses from NodeManager
are
- * available.
- *
- *
- *
- * Once a callback happens, the users can chose to act on it in blocking or
- * non-blocking manner. If the action on callback is done in a blocking
- * manner, some of the threads performing requests on NodeManagers may get
- * blocked depending on how many threads in the pool are busy.
- *
- *
- *
- * The implementation of the callback function should not throw the
- * unexpected exception. Otherwise, {@link NMClientAsync} will just
- * catch, log and then ignore it.
- *
- */
- public static interface CallbackHandler {
- /**
- * The API is called when NodeManager
responds to indicate its
- * acceptance of the starting container request
- * @param containerId the Id of the container
- * @param allServiceResponse a Map between the auxiliary service names and
- * their outputs
- */
- void onContainerStarted(ContainerId containerId,
- Map allServiceResponse);
-
- /**
- * The API is called when NodeManager
responds with the status
- * of the container
- * @param containerId the Id of the container
- * @param containerStatus the status of the container
- */
- void onContainerStatusReceived(ContainerId containerId,
- ContainerStatus containerStatus);
-
- /**
- * The API is called when NodeManager
responds to indicate the
- * container is stopped.
- * @param containerId the Id of the container
- */
- void onContainerStopped(ContainerId containerId);
-
- /**
- * The API is called when an exception is raised in the process of
- * starting a container
- *
- * @param containerId the Id of the container
- * @param t the raised exception
- */
- void onStartContainerError(ContainerId containerId, Throwable t);
-
- /**
- * The API is called when an exception is raised in the process of
- * querying the status of a container
- *
- * @param containerId the Id of the container
- * @param t the raised exception
- */
- void onGetContainerStatusError(ContainerId containerId, Throwable t);
-
- /**
- * The API is called when an exception is raised in the process of
- * stopping a container
- *
- * @param containerId the Id of the container
- * @param t the raised exception
- */
- void onStopContainerError(ContainerId containerId, Throwable t);
-
- }
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
new file mode 100644
index 0000000000..b9c7d5ef44
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
new file mode 100644
index 0000000000..9a40b9ad29
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
similarity index 92%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index c24b6fa4f5..a61ae0c967 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -57,7 +57,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -71,6 +72,7 @@
// TODO check inputs for null etc. YARN-654
+@Private
@Unstable
public class AMRMClientImpl extends AMRMClient {
@@ -312,64 +314,64 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
@Override
public synchronized void addContainerRequest(T req) {
Set allRacks = new HashSet();
- if (req.racks != null) {
- allRacks.addAll(req.racks);
- if(req.racks.size() != allRacks.size()) {
+ if (req.getRacks() != null) {
+ allRacks.addAll(req.getRacks());
+ if(req.getRacks().size() != allRacks.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: "
- + joiner.join(req.racks));
+ + joiner.join(req.getRacks()));
}
}
- allRacks.addAll(resolveRacks(req.nodes));
+ allRacks.addAll(resolveRacks(req.getNodes()));
- if (req.nodes != null) {
- HashSet dedupedNodes = new HashSet(req.nodes);
- if(dedupedNodes.size() != req.nodes.size()) {
+ if (req.getNodes() != null) {
+ HashSet dedupedNodes = new HashSet(req.getNodes());
+ if(dedupedNodes.size() != req.getNodes().size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate nodes: "
- + joiner.join(req.nodes));
+ + joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
- addResourceRequest(req.priority, node, req.capability,
- req.containerCount, req);
+ addResourceRequest(req.getPriority(), node, req.getCapability(),
+ req.getContainerCount(), req);
}
}
for (String rack : allRacks) {
- addResourceRequest(req.priority, rack, req.capability,
- req.containerCount, req);
+ addResourceRequest(req.getPriority(), rack, req.getCapability(),
+ req.getContainerCount(), req);
}
// Off-switch
- addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
- req.containerCount, req);
+ addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+ req.getContainerCount(), req);
}
@Override
public synchronized void removeContainerRequest(T req) {
Set allRacks = new HashSet();
- if (req.racks != null) {
- allRacks.addAll(req.racks);
+ if (req.getRacks() != null) {
+ allRacks.addAll(req.getRacks());
}
- allRacks.addAll(resolveRacks(req.nodes));
+ allRacks.addAll(resolveRacks(req.getNodes()));
// Update resource requests
- if (req.nodes != null) {
- for (String node : new HashSet(req.nodes)) {
- decResourceRequest(req.priority, node, req.capability,
- req.containerCount, req);
+ if (req.getNodes() != null) {
+ for (String node : new HashSet(req.getNodes())) {
+ decResourceRequest(req.getPriority(), node, req.getCapability(),
+ req.getContainerCount(), req);
}
}
for (String rack : allRacks) {
- decResourceRequest(req.priority, rack, req.capability,
- req.containerCount, req);
+ decResourceRequest(req.getPriority(), rack, req.getCapability(),
+ req.getContainerCount(), req);
}
- decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
- req.containerCount, req);
+ decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+ req.getContainerCount(), req);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index 042af516cf..1d9f128daa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -29,6 +29,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -76,6 +79,8 @@
* {@link #stopContainer}.
*
*/
+@Private
+@Unstable
public class NMClientImpl extends NMClient {
private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
@@ -86,7 +91,7 @@ public class NMClientImpl extends NMClient {
new ConcurrentHashMap();
//enabled by default
- protected final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
public NMClientImpl() {
super(NMClientImpl.class.getName());
@@ -100,7 +105,7 @@ public NMClientImpl(String name) {
protected void serviceStop() throws Exception {
// Usually, started-containers are stopped when this client stops. Unless
// the flag cleanupRunningContainers is set to false.
- if (cleanupRunningContainers.get()) {
+ if (getCleanupRunningContainers().get()) {
cleanupRunningContainers();
}
super.serviceStop();
@@ -126,7 +131,7 @@ protected synchronized void cleanupRunningContainers() {
@Override
public void cleanupRunningContainersOnStop(boolean enabled) {
- cleanupRunningContainers.set(enabled);
+ getCleanupRunningContainers().set(enabled);
}
protected static class StartedContainer {
@@ -171,7 +176,7 @@ public NMCommunicator(ContainerId containerId, NodeId nodeId,
}
@Override
- protected void serviceStart() throws Exception {
+ protected synchronized void serviceStart() throws Exception {
final YarnRPC rpc = YarnRPC.create(getConfig());
final InetSocketAddress containerAddress =
@@ -199,7 +204,7 @@ public ContainerManagementProtocol run() {
}
@Override
- protected void serviceStop() throws Exception {
+ protected synchronized void serviceStop() throws Exception {
if (this.containerManager != null) {
RPC.stopProxy(this.containerManager);
@@ -397,4 +402,8 @@ protected synchronized StartedContainer getStartedContainer(
return startedContainers.get(containerId);
}
+ public AtomicBoolean getCleanupRunningContainers() {
+ return cleanupRunningContainers;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
similarity index 96%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index fc41bac859..3e1d579346 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -25,8 +25,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@@ -56,13 +56,16 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
public class YarnClientImpl extends YarnClient {
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
@@ -304,4 +307,10 @@ private void getChildQueues(QueueInfo parent, List queues,
}
}
}
+
+ @Private
+ @VisibleForTesting
+ public void setRMClient(ApplicationClientProtocol rmClient) {
+ this.rmClient = rmClient;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
new file mode 100644
index 0000000000..6d5fb76150
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
new file mode 100644
index 0000000000..32084f37ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index 6bcd804f8a..312aab2513 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -27,12 +27,16 @@
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+@Private
+@Unstable
public class ApplicationCLI extends YarnCLI {
private static final String APPLICATIONS_PATTERN =
"%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
index d8c05a8b21..1de45ed96a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
@@ -28,12 +28,16 @@
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+@Private
+@Unstable
public class NodeCLI extends YarnCLI {
private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" +
System.getProperty("line.separator");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index 060ff4bc1f..3ea76168fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -23,6 +23,8 @@
import java.security.PrivilegedAction;
import java.util.Arrays;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.RemoteException;
@@ -42,6 +44,8 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+@Private
+@Unstable
public class RMAdminCLI extends Configured implements Tool {
private final RecordFactory recordFactory =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
index aa7cb8d519..5f86033e65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
@@ -19,12 +19,15 @@
import java.io.PrintStream;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+@Private
+@Unstable
public abstract class YarnCLI extends Configured implements Tool {
public static final String STATUS_CMD = "status";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
new file mode 100644
index 0000000000..85cfd7aca9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.cli;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index dd8a1c9421..6c1fcb2ad4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@@ -46,7 +46,10 @@
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -105,7 +108,7 @@ public Resource answer(InvocationOnMock invocation)
});
AMRMClientAsync asyncClient =
- new AMRMClientAsync(client, 20, callbackHandler);
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null);
@@ -160,7 +163,7 @@ public void testAMRMClientAsyncException() throws Exception {
when(client.allocate(anyFloat())).thenThrow(mockException);
AMRMClientAsync asyncClient =
- new AMRMClientAsync(client, 20, callbackHandler);
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
@@ -195,7 +198,7 @@ public void testAMRMClientAsyncReboot() throws Exception {
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
AMRMClientAsync asyncClient =
- new AMRMClientAsync(client, 20, callbackHandler);
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
similarity index 91%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
index 4c9c124438..4f659482b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
@@ -48,6 +48,9 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -62,7 +65,7 @@ public class TestNMClientAsync {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- private NMClientAsync asyncClient;
+ private NMClientAsyncImpl asyncClient;
private NodeId nodeId;
private Token containerToken;
@@ -71,7 +74,7 @@ public void teardown() {
ServiceOperations.stop(asyncClient);
}
- @Test (timeout = 30000)
+ @Test (timeout = 10000)
public void testNMClientAsync() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
@@ -89,40 +92,42 @@ public void testNMClientAsync() throws Exception {
for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
if (i == expectedSuccess) {
- while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+ while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isAllSuccessCallsExecuted()) {
Thread.sleep(10);
}
- asyncClient.client = mockNMClient(1);
+ asyncClient.setClient(mockNMClient(1));
}
Container container = mockContainer(i);
ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- asyncClient.startContainer(container, clc);
+ asyncClient.startContainerAsync(container, clc);
}
- while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+ while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isStartAndQueryFailureCallsExecuted()) {
Thread.sleep(10);
}
- asyncClient.client = mockNMClient(2);
- ((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
+ asyncClient.setClient(mockNMClient(2));
+ ((TestCallbackHandler1) asyncClient.getCallbackHandler()).path = false;
for (int i = 0; i < expectedFailure; ++i) {
Container container = mockContainer(
expectedSuccess + expectedFailure + i);
ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- asyncClient.startContainer(container, clc);
+ asyncClient.startContainerAsync(container, clc);
}
- while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+ while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isStopFailureCallsExecuted()) {
Thread.sleep(10);
}
for (String errorMsg :
- ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
+ ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .errorMsgs) {
System.out.println(errorMsg);
}
Assert.assertEquals("Error occurs in CallbackHandler", 0,
- ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
+ ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+ .errorMsgs.size());
for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
System.out.println(errorMsg);
}
@@ -141,7 +146,7 @@ public void testNMClientAsync() throws Exception {
asyncClient.threadPool.isShutdown());
}
- private class MockNMClientAsync1 extends NMClientAsync {
+ private class MockNMClientAsync1 extends NMClientAsyncImpl {
private Set errorMsgs =
Collections.synchronizedSet(new HashSet());
@@ -227,10 +232,10 @@ public void onContainerStarted(ContainerId containerId,
actualStartSuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
- asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+ asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
} else {
// move on to the following failure tests
- asyncClient.stopContainer(containerId, nodeId, containerToken);
+ asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
}
// Shouldn't crash the test thread
@@ -248,7 +253,7 @@ public void onContainerStatusReceived(ContainerId containerId,
actualQuerySuccess.addAndGet(1);
actualQuerySuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
- asyncClient.stopContainer(containerId, nodeId, containerToken);
+ asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -285,7 +290,7 @@ public void onStartContainerError(ContainerId containerId, Throwable t) {
actualStartFailure.addAndGet(1);
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests
- asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+ asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -426,22 +431,22 @@ public void testOutOfOrder() throws Exception {
Thread t = new Thread() {
@Override
public void run() {
- asyncClient.startContainer(container, clc);
+ asyncClient.startContainerAsync(container, clc);
}
};
t.start();
barrierA.await();
- asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
container.getContainerToken());
barrierC.await();
Assert.assertFalse("Starting and stopping should be out of order",
- ((TestCallbackHandler2) asyncClient.callbackHandler)
+ ((TestCallbackHandler2) asyncClient.getCallbackHandler())
.exceptionOccurred.get());
}
- private class MockNMClientAsync2 extends NMClientAsync {
+ private class MockNMClientAsync2 extends NMClientAsyncImpl {
private CyclicBarrier barrierA;
private CyclicBarrier barrierB;
@@ -510,7 +515,7 @@ public void onContainerStopped(ContainerId containerId) {
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
- if (!t.getMessage().equals(NMClientAsync.StatefulContainer
+ if (!t.getMessage().equals(NMClientAsyncImpl.StatefulContainer
.OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
exceptionOccurred.set(true);
return;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
similarity index 98%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 84a7252bf2..9398f984d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -57,8 +57,11 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
similarity index 91%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index ab2de5bc93..735b8edaa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import java.util.Arrays;
import java.util.List;
@@ -29,9 +29,10 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.junit.Test;
-import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
import static org.junit.Assert.assertEquals;
public class TestAMRMClientContainerRequest {
@@ -72,8 +73,8 @@ public void reloadCachedMappings() {}
private void verifyResourceRequestLocation(
AMRMClientImpl client, ContainerRequest request,
String location) {
- ResourceRequest ask = client.remoteRequestsTable.get(request.priority)
- .get(location).get(request.capability).remoteRequest;
+ ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
+ .get(location).get(request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(request.getContainerCount(), ask.getNumContainers());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index de4546123d..232903b4ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -51,7 +51,13 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -164,9 +170,9 @@ private void stopNmClient(boolean stopContainers) {
// leave one unclosed
assertEquals(1, nmClient.startedContainers.size());
// default true
- assertTrue(nmClient.cleanupRunningContainers.get());
+ assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.cleanupRunningContainersOnStop(stopContainers);
- assertEquals(stopContainers, nmClient.cleanupRunningContainers.get());
+ assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}
@@ -201,7 +207,7 @@ public void testNMClient()
// stop the running containers on close
assertFalse(nmClient.startedContainers.isEmpty());
nmClient.cleanupRunningContainersOnStop(true);
- assertTrue(nmClient.cleanupRunningContainers.get());
+ assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
similarity index 97%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 768371be0b..1c9b750558 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@@ -38,6 +38,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index 49d7867767..365bc8e0bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -46,7 +46,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.Test;