diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 98b416da2e..7a64b83867 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -28,6 +28,8 @@ Release 2.0.3-alpha - Unreleased
YARN-230. RM Restart phase 1 - includes support for saving/restarting all
applications on an RM bounce. (Bikas Saha via acmurthy)
+ YARN-103. Add a yarn AM-RM client module. (Bikas Saha via sseth)
+
IMPROVEMENTS
YARN-223. Update process tree instead of getting new process trees.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 10cf85b657..97ff1f7b5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -36,7 +36,13 @@
org.apache.hadoop
hadoop-yarn-server-resourcemanager
- test
+ test
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ test
+ test-jar
org.apache.hadoop
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
new file mode 100644
index 0000000000..1fa86d2cc2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
@@ -0,0 +1,153 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AMRMClient extends Service {
+
+ /**
+ * Value used to define no locality
+ */
+ static final String ANY = "*";
+
+ /**
+ * Object to represent container request for resources.
+ * Resources may be localized to nodes and racks.
+ * Resources may be assigned priorities.
+ * Can ask for multiple containers of a given type.
+ */
+ public static class ContainerRequest {
+ Resource capability;
+ String[] hosts;
+ String[] racks;
+ Priority priority;
+ int containerCount;
+
+ public ContainerRequest(Resource capability, String[] hosts,
+ String[] racks, Priority priority, int containerCount) {
+ this.capability = capability;
+ this.hosts = (hosts != null ? hosts.clone() : null);
+ this.racks = (racks != null ? racks.clone() : null);
+ this.priority = priority;
+ this.containerCount = containerCount;
+ }
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ sb.append("ContainerCount[").append(containerCount).append("]");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Register the application master. This must be called before any
+ * other interaction
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @return RegisterApplicationMasterResponse
+ * @throws YarnRemoteException
+ */
+ public RegisterApplicationMasterResponse
+ registerApplicationMaster(String appHostName,
+ int appHostPort,
+ String appTrackingUrl)
+ throws YarnRemoteException;
+
+ /**
+ * Request additional containers and receive new container allocations.
+ * Requests made via addContainerRequest
are sent to the
+ * ResourceManager
. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are
+ * also retrieved.
+ * This also doubles up as a heartbeat to the ResourceManager and must be
+ * made periodically.
+ * The call may not always return any new allocations of containers.
+ * App should not make concurrent allocate requests. May cause request loss.
+ * @param progressIndicator Indicates progress made by the master
+ * @return the response of the allocate request
+ * @throws YarnRemoteException
+ */
+ public AllocateResponse allocate(float progressIndicator)
+ throws YarnRemoteException;
+
+ /**
+ * Unregister the application master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnRemoteException
+ */
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage,
+ String appTrackingUrl)
+ throws YarnRemoteException;
+
+ /**
+ * Request containers for resources before calling allocate
+ * @param req Resource request
+ */
+ public void addContainerRequest(ContainerRequest req);
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public void removeContainerRequest(ContainerRequest req);
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release them.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. eg. it released non-local resources
+ * @param containerId
+ */
+ public void releaseAssignedContainer(ContainerId containerId);
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public Resource getClusterAvailableResources();
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public int getClusterNodeCount();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
new file mode 100644
index 0000000000..15c250e8d5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+@Unstable
+public class AMRMClientImpl extends AbstractService implements AMRMClient {
+
+ private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private int lastResponseId = 0;
+
+ protected AMRMProtocol rmClient;
+ protected final ApplicationAttemptId appAttemptId;
+ protected Resource clusterAvailableResources;
+ protected int clusterNodeCount;
+
+ //Key -> Priority
+ //Value -> Map
+ //Key->ResourceName (e.g., hostname, rackname, *)
+ //Value->Map
+ //Key->Resource Capability
+ //Value->ResourceRequest
+ protected final
+ Map>>
+ remoteRequestsTable =
+ new TreeMap>>();
+
+ protected final Set ask = new TreeSet(
+ new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+ protected final Set release = new TreeSet();
+
+ public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+ super(AMRMClientImpl.class.getName());
+ this.appAttemptId = appAttemptId;
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ @Override
+ public synchronized void start() {
+ final YarnConfiguration conf = new YarnConfiguration(getConfig());
+ final YarnRPC rpc = YarnRPC.create(conf);
+ final InetSocketAddress rmAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String tokenURLEncodedStr = System.getenv().get(
+ ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+ Token extends TokenIdentifier> token = new Token();
+
+ try {
+ token.decodeFromUrlString(tokenURLEncodedStr);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ SecurityUtil.setTokenService(token, rmAddress);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMasterToken is " + token);
+ }
+ currentUser.addToken(token);
+ }
+
+ rmClient = currentUser.doAs(new PrivilegedAction() {
+ @Override
+ public AMRMProtocol run() {
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+ conf);
+ }
+ });
+ LOG.debug("Connecting to ResourceManager at " + rmAddress);
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ RPC.stopProxy(this.rmClient);
+ super.stop();
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnRemoteException {
+ // do this only once ???
+ RegisterApplicationMasterRequest request = recordFactory
+ .newRecordInstance(RegisterApplicationMasterRequest.class);
+ synchronized (this) {
+ request.setApplicationAttemptId(appAttemptId);
+ }
+ request.setHost(appHostName);
+ request.setRpcPort(appHostPort);
+ if(appTrackingUrl != null) {
+ request.setTrackingUrl(appTrackingUrl);
+ }
+ RegisterApplicationMasterResponse response = rmClient
+ .registerApplicationMaster(request);
+ return response;
+ }
+
+ @Override
+ public AllocateResponse allocate(float progressIndicator)
+ throws YarnRemoteException {
+ AllocateResponse allocateResponse = null;
+ ArrayList askList = null;
+ ArrayList releaseList = null;
+ AllocateRequest allocateRequest = null;
+
+ try {
+ synchronized (this) {
+ askList = new ArrayList(ask);
+ releaseList = new ArrayList(release);
+ // optimistically clear this collection assuming no RPC failure
+ ask.clear();
+ release.clear();
+ allocateRequest = BuilderUtils
+ .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+ askList, releaseList);
+ }
+
+ allocateResponse = rmClient.allocate(allocateRequest);
+ AMResponse response = allocateResponse.getAMResponse();
+
+ synchronized (this) {
+ // update these on successful RPC
+ clusterNodeCount = allocateResponse.getNumClusterNodes();
+ lastResponseId = response.getResponseId();
+ clusterAvailableResources = response.getAvailableResources();
+ }
+ } finally {
+ // TODO how to differentiate remote yarn exception vs error in rpc
+ if(allocateResponse == null) {
+ // we hit an exception in allocate()
+ // preserve ask and release for next call to allocate()
+ synchronized (this) {
+ release.addAll(releaseList);
+ // requests could have been added or deleted during call to allocate
+ // If requests were added/removed then there is nothing to do since
+ // the ResourceRequest object in ask would have the actual new value.
+ // If ask does not have this ResourceRequest then it was unchanged and
+ // so we can add the value back safely.
+ // This assumes that there will no concurrent calls to allocate() and
+ // so we dont have to worry about ask being changed in the
+ // synchronized block at the beginning of this method.
+ for(ResourceRequest oldAsk : askList) {
+ if(!ask.contains(oldAsk)) {
+ ask.add(oldAsk);
+ }
+ }
+ }
+ }
+ }
+ return allocateResponse;
+ }
+
+ @Override
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage, String appTrackingUrl) throws YarnRemoteException {
+ FinishApplicationMasterRequest request = recordFactory
+ .newRecordInstance(FinishApplicationMasterRequest.class);
+ request.setAppAttemptId(appAttemptId);
+ request.setFinishApplicationStatus(appStatus);
+ if(appMessage != null) {
+ request.setDiagnostics(appMessage);
+ }
+ if(appTrackingUrl != null) {
+ request.setTrackingUrl(appTrackingUrl);
+ }
+ rmClient.finishApplicationMaster(request);
+ }
+
+ @Override
+ public synchronized void addContainerRequest(ContainerRequest req) {
+ // Create resource requests
+ if(req.hosts != null) {
+ for (String host : req.hosts) {
+ addResourceRequest(req.priority, host, req.capability, req.containerCount);
+ }
+ }
+
+ if(req.racks != null) {
+ for (String rack : req.racks) {
+ addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+ }
+ }
+
+ // Off-switch
+ addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+ }
+
+ @Override
+ public synchronized void removeContainerRequest(ContainerRequest req) {
+ // Update resource requests
+ if(req.hosts != null) {
+ for (String hostName : req.hosts) {
+ decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+ }
+ }
+
+ if(req.racks != null) {
+ for (String rack : req.racks) {
+ decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+ }
+ }
+
+ decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+ }
+
+ @Override
+ public synchronized void releaseAssignedContainer(ContainerId containerId) {
+ release.add(containerId);
+ }
+
+ @Override
+ public synchronized Resource getClusterAvailableResources() {
+ return clusterAvailableResources;
+ }
+
+ @Override
+ public synchronized int getClusterNodeCount() {
+ return clusterNodeCount;
+ }
+
+ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+ // This code looks weird but is needed because of the following scenario.
+ // A ResourceRequest is removed from the remoteRequestTable. A 0 container
+ // request is added to 'ask' to notify the RM about not needing it any more.
+ // Before the call to allocate, the user now requests more containers. If
+ // the locations of the 0 size request and the new request are the same
+ // (with the difference being only container count), then the set comparator
+ // will consider both to be the same and not add the new request to ask. So
+ // we need to check for the "same" request being present and remove it and
+ // then add it back. The comparator is container count agnostic.
+ // This should happen only rarely but we do need to guard against it.
+ if(ask.contains(remoteRequest)) {
+ ask.remove(remoteRequest);
+ }
+ ask.add(remoteRequest);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability, int containerCount) {
+ Map> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests = new HashMap>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
+ }
+ Map reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap();
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+ if (remoteRequest == null) {
+ remoteRequest = BuilderUtils.
+ newResourceRequest(priority, resourceName, capability, 0);
+ reqMap.put(capability, remoteRequest);
+ }
+
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+
+ // Note this down for next interaction with ResourceManager
+ addResourceRequestToAsk(remoteRequest);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+ }
+
+ private void decResourceRequest(Priority priority, String resourceName,
+ Resource capability, int containerCount) {
+ Map> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+
+ if(remoteRequests == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as priority " + priority
+ + " is not present in request table");
+ }
+ return;
+ }
+
+ Map reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ }
+ return;
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+
+ remoteRequest.
+ setNumContainers(remoteRequest.getNumContainers() - containerCount);
+ if(remoteRequest.getNumContainers() < 0) {
+ // guard against spurious removals
+ remoteRequest.setNumContainers(0);
+ }
+ // send the ResourceRequest to RM even if is 0 because it needs to override
+ // a previously sent value. If ResourceRequest was not sent previously then
+ // sending 0 aught to be a no-op on RM
+ addResourceRequestToAsk(remoteRequest);
+
+ // delete entries from map if no longer needed
+ if (remoteRequest.getNumContainers() == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
new file mode 100644
index 0000000000..95fae134b2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.Records;
+
+public class TestAMRMClient {
+ Configuration conf = null;
+ MiniYARNCluster yarnCluster = null;
+ YarnClientImpl yarnClient = null;
+ List nodeReports = null;
+ ApplicationAttemptId attemptId = null;
+ int nodeCount = 3;
+
+ @Before
+ public void setup() throws YarnRemoteException {
+ // start minicluster
+ conf = new YarnConfiguration();
+ yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = new YarnClientImpl();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports();
+
+ // submit new app
+ GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ break;
+ }
+ }
+ }
+
+ @After
+ public void tearDown() {
+ if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClient() throws YarnRemoteException {
+ AMRMClientImpl amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl(attemptId);
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ testAllocation(amClient);
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+
+ private void testAllocation(final AMRMClientImpl amClient)
+ throws YarnRemoteException {
+ // setup container request
+ final Resource capability = Records.newRecord(Resource.class);
+ final Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+ capability.setMemory(1024);
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String rack = nodeReports.get(0).getRackName();
+ final String[] nodes = { node };
+ final String[] racks = { rack };
+
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 1));
+ amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 3));
+ amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 2));
+
+ int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
+ .get(node).get(capability).getNumContainers();
+ int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
+ .get(rack).get(capability).getNumContainers();
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+ .get(AMRMClient.ANY).get(capability).getNumContainers();
+
+ assertTrue(containersRequestedNode == 2);
+ assertTrue(containersRequestedRack == 2);
+ assertTrue(containersRequestedAny == 2);
+ assertTrue(amClient.ask.size() == 3);
+ assertTrue(amClient.release.size() == 0);
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 2;
+ Set releases = new TreeSet();
+ while (allocatedContainerCount < containersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ assertTrue(nodeCount == amClient.getClusterNodeCount());
+ AMResponse amResponse = allocResponse.getAMResponse();
+ allocatedContainerCount += amResponse.getAllocatedContainers().size();
+ for(Container container : amResponse.getAllocatedContainers()) {
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ if(allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(1000);
+ }
+ }
+
+ assertTrue(allocatedContainerCount == containersRequestedAny);
+ assertTrue(amClient.release.size() == 2);
+ assertTrue(amClient.ask.size() == 0);
+
+ // need to tell the AMRMClient that we dont need these resources anymore
+ amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 2));
+ assertTrue(amClient.ask.size() == 3);
+ // send 0 container count request for resources that are no longer needed
+ ResourceRequest snoopRequest = amClient.ask.iterator().next();
+ assertTrue(snoopRequest.getNumContainers() == 0);
+
+ // test RPC exception handling
+ amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 2));
+ snoopRequest = amClient.ask.iterator().next();
+ assertTrue(snoopRequest.getNumContainers() == 2);
+
+ AMRMProtocol realRM = amClient.rmClient;
+ try {
+ AMRMProtocol mockRM = mock(AMRMProtocol.class);
+ when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+ new Answer() {
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Exception {
+ amClient.removeContainerRequest(new ContainerRequest(capability,
+ nodes, racks, priority, 2));
+ throw new Exception();
+ }
+ });
+ amClient.rmClient = mockRM;
+ amClient.allocate(0.1f);
+ }catch (Exception ioe) {}
+ finally {
+ amClient.rmClient = realRM;
+ }
+
+ assertTrue(amClient.release.size() == 2);
+ assertTrue(amClient.ask.size() == 3);
+ snoopRequest = amClient.ask.iterator().next();
+ // verify that the remove request made in between makeRequest and allocate
+ // has not been lost
+ assertTrue(snoopRequest.getNumContainers() == 0);
+
+ iterationsLeft = 2;
+ // do a few iterations to ensure RM is not going send new containers
+ while(!releases.isEmpty() || iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ AMResponse amResponse = allocResponse.getAMResponse();
+ // RM did not send new containers because AM does not need any
+ assertTrue(amResponse.getAllocatedContainers().size() == 0);
+ if(amResponse.getCompletedContainersStatuses().size() > 0) {
+ for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
+ if(releases.contains(cStatus.getContainerId())) {
+ assertTrue(cStatus.getState() == ContainerState.COMPLETE);
+ assertTrue(cStatus.getExitStatus() == -100);
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if(iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(1000);
+ }
+ }
+
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}