YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks. (Sandy Ryza via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1459555 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
036c168f4f
commit
28bac40295
@ -88,6 +88,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via
|
||||
hitesh)
|
||||
|
||||
YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks.
|
||||
(Sandy Ryza via bikas)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -63,12 +63,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
@ -147,8 +147,8 @@ public class ApplicationMaster {
|
||||
private YarnRPC rpc;
|
||||
|
||||
// Handle to communicate with the Resource Manager
|
||||
private AMRMClient resourceManager;
|
||||
|
||||
private AMRMClientAsync resourceManager;
|
||||
|
||||
// Application Attempt Id ( combination of attemptId and fail count )
|
||||
private ApplicationAttemptId appAttemptID;
|
||||
|
||||
@ -169,8 +169,6 @@ public class ApplicationMaster {
|
||||
// Priority of the request
|
||||
private int requestPriority;
|
||||
|
||||
// Simple flag to denote whether all works is done
|
||||
private boolean appDone = false;
|
||||
// Counter for completed containers ( complete denotes successful or failed )
|
||||
private AtomicInteger numCompletedContainers = new AtomicInteger();
|
||||
// Allocated container count so that we know how many containers has the RM
|
||||
@ -201,6 +199,9 @@ public class ApplicationMaster {
|
||||
// Hardcoded path to shell script in launch container's local env
|
||||
private final String ExecShellStringPath = "ExecShellScript.sh";
|
||||
|
||||
private volatile boolean done;
|
||||
private volatile boolean success;
|
||||
|
||||
// Launch threads
|
||||
private List<Thread> launchThreads = new ArrayList<Thread>();
|
||||
|
||||
@ -416,226 +417,202 @@ private void printUsage(Options opts) {
|
||||
public boolean run() throws YarnRemoteException {
|
||||
LOG.info("Starting ApplicationMaster");
|
||||
|
||||
// Connect to ResourceManager
|
||||
resourceManager = new AMRMClientImpl(appAttemptID);
|
||||
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
||||
|
||||
resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
|
||||
resourceManager.init(conf);
|
||||
resourceManager.start();
|
||||
|
||||
try {
|
||||
// Setup local RPC Server to accept status requests directly from clients
|
||||
// TODO need to setup a protocol for client to be able to communicate to
|
||||
// the RPC server
|
||||
// TODO use the rpc port info to register with the RM for the client to
|
||||
// send requests to this app master
|
||||
// Setup local RPC Server to accept status requests directly from clients
|
||||
// TODO need to setup a protocol for client to be able to communicate to
|
||||
// the RPC server
|
||||
// TODO use the rpc port info to register with the RM for the client to
|
||||
// send requests to this app master
|
||||
|
||||
// Register self with ResourceManager
|
||||
RegisterApplicationMasterResponse response = resourceManager
|
||||
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
||||
appMasterTrackingUrl);
|
||||
// Dump out information about cluster capability as seen by the
|
||||
// resource manager
|
||||
int minMem = response.getMinimumResourceCapability().getMemory();
|
||||
int maxMem = response.getMaximumResourceCapability().getMemory();
|
||||
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
|
||||
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
|
||||
// Register self with ResourceManager
|
||||
// This will start heartbeating to the RM
|
||||
RegisterApplicationMasterResponse response = resourceManager
|
||||
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
||||
appMasterTrackingUrl);
|
||||
// Dump out information about cluster capability as seen by the
|
||||
// resource manager
|
||||
int minMem = response.getMinimumResourceCapability().getMemory();
|
||||
int maxMem = response.getMaximumResourceCapability().getMemory();
|
||||
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
|
||||
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
|
||||
|
||||
// A resource ask has to be atleast the minimum of the capability of the
|
||||
// cluster, the value has to be a multiple of the min value and cannot
|
||||
// exceed the max.
|
||||
// If it is not an exact multiple of min, the RM will allocate to the
|
||||
// nearest multiple of min
|
||||
if (containerMemory < minMem) {
|
||||
LOG.info("Container memory specified below min threshold of cluster."
|
||||
+ " Using min value." + ", specified=" + containerMemory + ", min="
|
||||
+ minMem);
|
||||
containerMemory = minMem;
|
||||
} else if (containerMemory > maxMem) {
|
||||
LOG.info("Container memory specified above max threshold of cluster."
|
||||
+ " Using max value." + ", specified=" + containerMemory + ", max="
|
||||
+ maxMem);
|
||||
containerMemory = maxMem;
|
||||
}
|
||||
|
||||
// Setup heartbeat emitter
|
||||
// TODO poll RM every now and then with an empty request to let RM know
|
||||
// that we are alive
|
||||
// The heartbeat interval after which an AM is timed out by the RM is
|
||||
// defined by a config setting:
|
||||
// RM_AM_EXPIRY_INTERVAL_MS with default defined by
|
||||
// DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
|
||||
// The allocate calls to the RM count as heartbeats so, for now,
|
||||
// this additional heartbeat emitter is not required.
|
||||
|
||||
// Setup ask for containers from RM
|
||||
// Send request for containers to RM
|
||||
// Until we get our fully allocated quota, we keep on polling RM for
|
||||
// containers
|
||||
// Keep looping until all the containers are launched and shell script
|
||||
// executed on them ( regardless of success/failure).
|
||||
|
||||
int loopCounter = -1;
|
||||
|
||||
while (numCompletedContainers.get() < numTotalContainers && !appDone) {
|
||||
loopCounter++;
|
||||
|
||||
// log current state
|
||||
LOG.info("Current application state: loop=" + loopCounter
|
||||
+ ", appDone=" + appDone + ", total=" + numTotalContainers
|
||||
+ ", requested=" + numRequestedContainers + ", completed="
|
||||
+ numCompletedContainers + ", failed=" + numFailedContainers
|
||||
+ ", currentAllocated=" + numAllocatedContainers);
|
||||
|
||||
// Sleep before each loop when asking RM for containers
|
||||
// to avoid flooding RM with spurious requests when it
|
||||
// need not have any available containers
|
||||
// Sleeping for 1000 ms.
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Sleep interrupted " + e.getMessage());
|
||||
}
|
||||
|
||||
// No. of containers to request
|
||||
// For the first loop, askCount will be equal to total containers needed
|
||||
// From that point on, askCount will always be 0 as current
|
||||
// implementation does not change its ask on container failures.
|
||||
int askCount = numTotalContainers - numRequestedContainers.get();
|
||||
numRequestedContainers.addAndGet(askCount);
|
||||
|
||||
if (askCount > 0) {
|
||||
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
}
|
||||
|
||||
// Send the request to RM
|
||||
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
|
||||
AllocateResponse allocResp = sendContainerAskToRM();
|
||||
|
||||
// Retrieve list of allocated containers from the response
|
||||
List<Container> allocatedContainers =
|
||||
allocResp.getAllocatedContainers();
|
||||
LOG.info("Got response from RM for container ask, allocatedCnt="
|
||||
+ allocatedContainers.size());
|
||||
numAllocatedContainers.addAndGet(allocatedContainers.size());
|
||||
for (Container allocatedContainer : allocatedContainers) {
|
||||
LOG.info("Launching shell command on a new container."
|
||||
+ ", containerId=" + allocatedContainer.getId()
|
||||
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
|
||||
+ ":" + allocatedContainer.getNodeId().getPort()
|
||||
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
||||
+ ", containerState" + allocatedContainer.getState()
|
||||
+ ", containerResourceMemory"
|
||||
+ allocatedContainer.getResource().getMemory());
|
||||
// + ", containerToken"
|
||||
// +allocatedContainer.getContainerToken().getIdentifier().toString());
|
||||
|
||||
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
|
||||
allocatedContainer);
|
||||
Thread launchThread = new Thread(runnableLaunchContainer);
|
||||
|
||||
// launch and start the container on a separate thread to keep
|
||||
// the main thread unblocked
|
||||
// as all containers may not be allocated at one go.
|
||||
launchThreads.add(launchThread);
|
||||
launchThread.start();
|
||||
}
|
||||
|
||||
// Check what the current available resources in the cluster are
|
||||
// TODO should we do anything if the available resources are not enough?
|
||||
Resource availableResources = allocResp.getAvailableResources();
|
||||
LOG.info("Current available resources in the cluster "
|
||||
+ availableResources);
|
||||
|
||||
// Check the completed containers
|
||||
List<ContainerStatus> completedContainers = allocResp
|
||||
.getCompletedContainersStatuses();
|
||||
LOG.info("Got response from RM for container ask, completedCnt="
|
||||
+ completedContainers.size());
|
||||
for (ContainerStatus containerStatus : completedContainers) {
|
||||
LOG.info("Got container status for containerID="
|
||||
+ containerStatus.getContainerId() + ", state="
|
||||
+ containerStatus.getState() + ", exitStatus="
|
||||
+ containerStatus.getExitStatus() + ", diagnostics="
|
||||
+ containerStatus.getDiagnostics());
|
||||
|
||||
// non complete containers should not be here
|
||||
assert (containerStatus.getState() == ContainerState.COMPLETE);
|
||||
|
||||
// increment counters for completed/failed containers
|
||||
int exitStatus = containerStatus.getExitStatus();
|
||||
if (0 != exitStatus) {
|
||||
// container failed
|
||||
if (-100 != exitStatus) {
|
||||
// shell script failed
|
||||
// counts as completed
|
||||
numCompletedContainers.incrementAndGet();
|
||||
numFailedContainers.incrementAndGet();
|
||||
} else {
|
||||
// something else bad happened
|
||||
// app job did not complete for some reason
|
||||
// we should re-try as the container was lost for some reason
|
||||
numAllocatedContainers.decrementAndGet();
|
||||
numRequestedContainers.decrementAndGet();
|
||||
// we do not need to release the container as it would be done
|
||||
// by the RM/CM.
|
||||
}
|
||||
} else {
|
||||
// nothing to do
|
||||
// container completed successfully
|
||||
numCompletedContainers.incrementAndGet();
|
||||
LOG.info("Container completed successfully." + ", containerId="
|
||||
+ containerStatus.getContainerId());
|
||||
}
|
||||
}
|
||||
if (numCompletedContainers.get() == numTotalContainers) {
|
||||
appDone = true;
|
||||
}
|
||||
|
||||
LOG.info("Current application state: loop=" + loopCounter
|
||||
+ ", appDone=" + appDone + ", total=" + numTotalContainers
|
||||
+ ", requested=" + numRequestedContainers + ", completed="
|
||||
+ numCompletedContainers + ", failed=" + numFailedContainers
|
||||
+ ", currentAllocated=" + numAllocatedContainers);
|
||||
|
||||
// TODO
|
||||
// Add a timeout handling layer
|
||||
// for misbehaving shell commands
|
||||
}
|
||||
|
||||
// Join all launched threads
|
||||
// needed for when we time out
|
||||
// and we need to release containers
|
||||
for (Thread launchThread : launchThreads) {
|
||||
try {
|
||||
launchThread.join(10000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Exception thrown in thread join: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
// When the application completes, it should send a finish application
|
||||
// signal to the RM
|
||||
LOG.info("Application completed. Signalling finish to RM");
|
||||
|
||||
FinalApplicationStatus appStatus;
|
||||
String appMessage = null;
|
||||
boolean isSuccess = true;
|
||||
if (numFailedContainers.get() == 0) {
|
||||
appStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
} else {
|
||||
appStatus = FinalApplicationStatus.FAILED;
|
||||
appMessage = "Diagnostics." + ", total=" + numTotalContainers
|
||||
+ ", completed=" + numCompletedContainers.get() + ", allocated="
|
||||
+ numAllocatedContainers.get() + ", failed="
|
||||
+ numFailedContainers.get();
|
||||
isSuccess = false;
|
||||
}
|
||||
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
|
||||
return isSuccess;
|
||||
} finally {
|
||||
resourceManager.stop();
|
||||
// A resource ask has to be atleast the minimum of the capability of the
|
||||
// cluster, the value has to be a multiple of the min value and cannot
|
||||
// exceed the max.
|
||||
// If it is not an exact multiple of min, the RM will allocate to the
|
||||
// nearest multiple of min
|
||||
if (containerMemory < minMem) {
|
||||
LOG.info("Container memory specified below min threshold of cluster."
|
||||
+ " Using min value." + ", specified=" + containerMemory + ", min="
|
||||
+ minMem);
|
||||
containerMemory = minMem;
|
||||
} else if (containerMemory > maxMem) {
|
||||
LOG.info("Container memory specified above max threshold of cluster."
|
||||
+ " Using max value." + ", specified=" + containerMemory + ", max="
|
||||
+ maxMem);
|
||||
containerMemory = maxMem;
|
||||
}
|
||||
|
||||
|
||||
// Setup ask for containers from RM
|
||||
// Send request for containers to RM
|
||||
// Until we get our fully allocated quota, we keep on polling RM for
|
||||
// containers
|
||||
// Keep looping until all the containers are launched and shell script
|
||||
// executed on them ( regardless of success/failure).
|
||||
ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
numRequestedContainers.set(numTotalContainers);
|
||||
|
||||
while (!done) {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException ex) {}
|
||||
}
|
||||
finish();
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
private void finish() {
|
||||
// Join all launched threads
|
||||
// needed for when we time out
|
||||
// and we need to release containers
|
||||
for (Thread launchThread : launchThreads) {
|
||||
try {
|
||||
launchThread.join(10000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Exception thrown in thread join: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
// When the application completes, it should send a finish application
|
||||
// signal to the RM
|
||||
LOG.info("Application completed. Signalling finish to RM");
|
||||
|
||||
FinalApplicationStatus appStatus;
|
||||
String appMessage = null;
|
||||
success = true;
|
||||
if (numFailedContainers.get() == 0) {
|
||||
appStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
} else {
|
||||
appStatus = FinalApplicationStatus.FAILED;
|
||||
appMessage = "Diagnostics." + ", total=" + numTotalContainers
|
||||
+ ", completed=" + numCompletedContainers.get() + ", allocated="
|
||||
+ numAllocatedContainers.get() + ", failed="
|
||||
+ numFailedContainers.get();
|
||||
success = false;
|
||||
}
|
||||
try {
|
||||
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
|
||||
} catch (YarnRemoteException ex) {
|
||||
LOG.error("Failed to unregister application", ex);
|
||||
}
|
||||
|
||||
done = true;
|
||||
resourceManager.stop();
|
||||
}
|
||||
|
||||
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||
@Override
|
||||
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
|
||||
LOG.info("Got response from RM for container ask, completedCnt="
|
||||
+ completedContainers.size());
|
||||
for (ContainerStatus containerStatus : completedContainers) {
|
||||
LOG.info("Got container status for containerID="
|
||||
+ containerStatus.getContainerId() + ", state="
|
||||
+ containerStatus.getState() + ", exitStatus="
|
||||
+ containerStatus.getExitStatus() + ", diagnostics="
|
||||
+ containerStatus.getDiagnostics());
|
||||
|
||||
// non complete containers should not be here
|
||||
assert (containerStatus.getState() == ContainerState.COMPLETE);
|
||||
|
||||
// increment counters for completed/failed containers
|
||||
int exitStatus = containerStatus.getExitStatus();
|
||||
if (0 != exitStatus) {
|
||||
// container failed
|
||||
if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
|
||||
// shell script failed
|
||||
// counts as completed
|
||||
numCompletedContainers.incrementAndGet();
|
||||
numFailedContainers.incrementAndGet();
|
||||
} else {
|
||||
// container was killed by framework, possibly preempted
|
||||
// we should re-try as the container was lost for some reason
|
||||
numAllocatedContainers.decrementAndGet();
|
||||
numRequestedContainers.decrementAndGet();
|
||||
// we do not need to release the container as it would be done
|
||||
// by the RM
|
||||
}
|
||||
} else {
|
||||
// nothing to do
|
||||
// container completed successfully
|
||||
numCompletedContainers.incrementAndGet();
|
||||
LOG.info("Container completed successfully." + ", containerId="
|
||||
+ containerStatus.getContainerId());
|
||||
}
|
||||
}
|
||||
|
||||
// ask for more containers if any failed
|
||||
int askCount = numTotalContainers - numRequestedContainers.get();
|
||||
numRequestedContainers.addAndGet(askCount);
|
||||
|
||||
if (askCount > 0) {
|
||||
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
}
|
||||
|
||||
// set progress to deliver to RM on next heartbeat
|
||||
float progress = (float) numCompletedContainers.get()
|
||||
/ numTotalContainers;
|
||||
resourceManager.setProgress(progress);
|
||||
|
||||
if (numCompletedContainers.get() == numTotalContainers) {
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersAllocated(List<Container> allocatedContainers) {
|
||||
LOG.info("Got response from RM for container ask, allocatedCnt="
|
||||
+ allocatedContainers.size());
|
||||
numAllocatedContainers.addAndGet(allocatedContainers.size());
|
||||
for (Container allocatedContainer : allocatedContainers) {
|
||||
LOG.info("Launching shell command on a new container."
|
||||
+ ", containerId=" + allocatedContainer.getId()
|
||||
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
|
||||
+ ":" + allocatedContainer.getNodeId().getPort()
|
||||
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
||||
+ ", containerState" + allocatedContainer.getState()
|
||||
+ ", containerResourceMemory"
|
||||
+ allocatedContainer.getResource().getMemory());
|
||||
// + ", containerToken"
|
||||
// +allocatedContainer.getContainerToken().getIdentifier().toString());
|
||||
|
||||
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
|
||||
allocatedContainer);
|
||||
Thread launchThread = new Thread(runnableLaunchContainer);
|
||||
|
||||
// launch and start the container on a separate thread to keep
|
||||
// the main thread unblocked
|
||||
// as all containers may not be allocated at one go.
|
||||
launchThreads.add(launchThread);
|
||||
launchThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRebootRequest() {}
|
||||
|
||||
@Override
|
||||
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -811,21 +788,4 @@ private ContainerRequest setupContainerAskForRM(int numContainers) {
|
||||
LOG.info("Requested container ask: " + request.toString());
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ask RM to allocate given no. of containers to this Application Master
|
||||
*
|
||||
* @param requestedContainers Containers to ask for from RM
|
||||
* @return Response from RM to AM with allocated containers
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
|
||||
float progressIndicator = (float) numCompletedContainers.get()
|
||||
/ numTotalContainers;
|
||||
|
||||
LOG.info("Sending request to RM for containers" + ", progress="
|
||||
+ progressIndicator);
|
||||
|
||||
return resourceManager.allocate(progressIndicator);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,354 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
|
||||
* and provides asynchronous updates on events such as container allocations and
|
||||
* completions. It contains a thread that sends periodic heartbeats to the
|
||||
* ResourceManager.
|
||||
*
|
||||
* It should be used by implementing a CallbackHandler:
|
||||
* <pre>
|
||||
* {@code
|
||||
* class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||
* public void onContainersAllocated(List<Container> containers) {
|
||||
* [run tasks on the containers]
|
||||
* }
|
||||
*
|
||||
* public void onContainersCompleted(List<ContainerStatus> statuses) {
|
||||
* [update progress, check whether app is done]
|
||||
* }
|
||||
*
|
||||
* public void onNodesUpdated(List<NodeReport> updated) {}
|
||||
*
|
||||
* public void onReboot() {}
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* The client's lifecycle should be managed similarly to the following:
|
||||
*
|
||||
* <pre>
|
||||
* {@code
|
||||
* AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
|
||||
* asyncClient.init(conf);
|
||||
* asyncClient.start();
|
||||
* RegisterApplicationMasterResponse response = asyncClient
|
||||
* .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
||||
* appMasterTrackingUrl);
|
||||
* asyncClient.addContainerRequest(containerRequest);
|
||||
* [... wait for application to complete]
|
||||
* asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
|
||||
* asyncClient.stop();
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
@Unstable
|
||||
@Evolving
|
||||
public class AMRMClientAsync extends AbstractService {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
|
||||
|
||||
private final AMRMClient client;
|
||||
private final int intervalMs;
|
||||
private final HeartbeatThread heartbeatThread;
|
||||
private final CallbackHandlerThread handlerThread;
|
||||
private final CallbackHandler handler;
|
||||
|
||||
private final BlockingQueue<AllocateResponse> responseQueue;
|
||||
|
||||
private volatile boolean keepRunning;
|
||||
private volatile float progress;
|
||||
|
||||
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
|
||||
CallbackHandler callbackHandler) {
|
||||
this(new AMRMClientImpl(id), intervalMs, callbackHandler);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
AMRMClientAsync(AMRMClient client, int intervalMs,
|
||||
CallbackHandler callbackHandler) {
|
||||
super(AMRMClientAsync.class.getName());
|
||||
this.client = client;
|
||||
this.intervalMs = intervalMs;
|
||||
handler = callbackHandler;
|
||||
heartbeatThread = new HeartbeatThread();
|
||||
handlerThread = new CallbackHandlerThread();
|
||||
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
|
||||
keepRunning = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the application's current progress. It will be transmitted to the
|
||||
* resource manager on the next heartbeat.
|
||||
* @param progress
|
||||
* the application's progress so far
|
||||
*/
|
||||
public void setProgress(float progress) {
|
||||
this.progress = progress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
super.init(conf);
|
||||
client.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
handlerThread.start();
|
||||
client.start();
|
||||
super.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells the heartbeat and handler threads to stop and waits for them to
|
||||
* terminate. Calling this method from the callback handler thread would cause
|
||||
* deadlock, and thus should be avoided.
|
||||
*/
|
||||
@Override
|
||||
public void stop() {
|
||||
if (Thread.currentThread() == handlerThread) {
|
||||
throw new YarnException("Cannot call stop from callback handler thread!");
|
||||
}
|
||||
keepRunning = false;
|
||||
try {
|
||||
heartbeatThread.join();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Error joining with heartbeat thread", ex);
|
||||
}
|
||||
client.stop();
|
||||
try {
|
||||
handlerThread.interrupt();
|
||||
handlerThread.join();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Error joining with hander thread", ex);
|
||||
}
|
||||
super.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers this application master with the resource manager. On successful
|
||||
* registration, starts the heartbeating thread.
|
||||
*/
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
String appHostName, int appHostPort, String appTrackingUrl)
|
||||
throws YarnRemoteException {
|
||||
RegisterApplicationMasterResponse response =
|
||||
client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
|
||||
heartbeatThread.start();
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister the application master. This must be called in the end.
|
||||
* @param appStatus Success/Failure status of the master
|
||||
* @param appMessage Diagnostics message on failure
|
||||
* @param appTrackingUrl New URL to get master info
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
|
||||
String appMessage, String appTrackingUrl) throws YarnRemoteException {
|
||||
synchronized (client) {
|
||||
keepRunning = false;
|
||||
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request containers for resources before calling <code>allocate</code>
|
||||
* @param req Resource request
|
||||
*/
|
||||
public void addContainerRequest(AMRMClient.ContainerRequest req) {
|
||||
client.addContainerRequest(req);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove previous container request. The previous container request may have
|
||||
* already been sent to the ResourceManager. So even after the remove request
|
||||
* the app must be prepared to receive an allocation for the previous request
|
||||
* even after the remove request
|
||||
* @param req Resource request
|
||||
*/
|
||||
public void removeContainerRequest(AMRMClient.ContainerRequest req) {
|
||||
client.removeContainerRequest(req);
|
||||
}
|
||||
|
||||
/**
|
||||
* Release containers assigned by the Resource Manager. If the app cannot use
|
||||
* the container or wants to give up the container then it can release them.
|
||||
* The app needs to make new requests for the released resource capability if
|
||||
* it still needs it. eg. it released non-local resources
|
||||
* @param containerId
|
||||
*/
|
||||
public void releaseAssignedContainer(ContainerId containerId) {
|
||||
client.releaseAssignedContainer(containerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the currently available resources in the cluster.
|
||||
* A valid value is available after a call to allocate has been made
|
||||
* @return Currently available resources
|
||||
*/
|
||||
public Resource getClusterAvailableResources() {
|
||||
return client.getClusterAvailableResources();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current number of nodes in the cluster.
|
||||
* A valid values is available after a call to allocate has been made
|
||||
* @return Current number of nodes in the cluster
|
||||
*/
|
||||
public int getClusterNodeCount() {
|
||||
return client.getClusterNodeCount();
|
||||
}
|
||||
|
||||
private class HeartbeatThread extends Thread {
|
||||
public HeartbeatThread() {
|
||||
super("AMRM Heartbeater thread");
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
AllocateResponse response = null;
|
||||
// synchronization ensures we don't send heartbeats after unregistering
|
||||
synchronized (client) {
|
||||
if (!keepRunning) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
response = client.allocate(progress);
|
||||
} catch (YarnRemoteException ex) {
|
||||
LOG.error("Failed to heartbeat", ex);
|
||||
}
|
||||
}
|
||||
if (response != null) {
|
||||
while (true) {
|
||||
try {
|
||||
responseQueue.put(response);
|
||||
break;
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Interrupted while waiting to put on response queue", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(intervalMs);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Heartbeater interrupted", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class CallbackHandlerThread extends Thread {
|
||||
public CallbackHandlerThread() {
|
||||
super("AMRM Callback Handler Thread");
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (keepRunning) {
|
||||
AllocateResponse response;
|
||||
try {
|
||||
response = responseQueue.take();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.info("Interrupted while waiting for queue");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (response.getReboot()) {
|
||||
handler.onRebootRequest();
|
||||
}
|
||||
List<NodeReport> updatedNodes = response.getUpdatedNodes();
|
||||
if (!updatedNodes.isEmpty()) {
|
||||
handler.onNodesUpdated(updatedNodes);
|
||||
}
|
||||
|
||||
List<ContainerStatus> completed =
|
||||
response.getCompletedContainersStatuses();
|
||||
if (!completed.isEmpty()) {
|
||||
handler.onContainersCompleted(completed);
|
||||
}
|
||||
|
||||
List<Container> allocated = response.getAllocatedContainers();
|
||||
if (!allocated.isEmpty()) {
|
||||
handler.onContainersAllocated(allocated);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface CallbackHandler {
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with completed
|
||||
* containers. If the response contains both completed containers and
|
||||
* allocated containers, this will be called before containersAllocated.
|
||||
*/
|
||||
public void onContainersCompleted(List<ContainerStatus> statuses);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with allocated
|
||||
* containers. If the response containers both completed containers and
|
||||
* allocated containers, this will be called after containersCompleted.
|
||||
*/
|
||||
public void onContainersAllocated(List<Container> containers);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager wants the ApplicationMaster to reboot
|
||||
* for being out of sync.
|
||||
*/
|
||||
public void onRebootRequest();
|
||||
|
||||
/**
|
||||
* Called when nodes tracked by the ResourceManager have changed in in health,
|
||||
* availability etc.
|
||||
*/
|
||||
public void onNodesUpdated(List<NodeReport> updatedNodes);
|
||||
}
|
||||
}
|
@ -0,0 +1,184 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import static org.mockito.Mockito.anyFloat;
|
||||
import static org.mockito.Mockito.anyInt;
|
||||
import static org.mockito.Mockito.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestAMRMClientAsync {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testAMRMClientAsync() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
List<ContainerStatus> completed1 = Arrays.asList(
|
||||
BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(0, 0, 0, 0),
|
||||
ContainerState.COMPLETE, "", 0));
|
||||
List<Container> allocated1 = Arrays.asList(
|
||||
BuilderUtils.newContainer(null, null, null, null, null, null));
|
||||
final AllocateResponse response1 = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), allocated1);
|
||||
final AllocateResponse response2 = createAllocateResponse(completed1,
|
||||
new ArrayList<Container>());
|
||||
final AllocateResponse emptyResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
||||
|
||||
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||
AMRMClient client = mock(AMRMClient.class);
|
||||
final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
|
||||
when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
|
||||
@Override
|
||||
public AllocateResponse answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
secondHeartbeatReceived.set(true);
|
||||
return response2;
|
||||
}
|
||||
}).thenReturn(emptyResponse);
|
||||
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
|
||||
.thenReturn(null);
|
||||
|
||||
AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
|
||||
asyncClient.init(conf);
|
||||
asyncClient.start();
|
||||
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
||||
|
||||
// while the CallbackHandler will still only be processing the first response,
|
||||
// heartbeater thread should still be sending heartbeats.
|
||||
// To test this, wait for the second heartbeat to be received.
|
||||
while (!secondHeartbeatReceived.get()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
// allocated containers should come before completed containers
|
||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||
|
||||
// wait for the allocated containers from the first heartbeat's response
|
||||
while (callbackHandler.takeAllocatedContainers() == null) {
|
||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
// wait for the completed containers from the second heartbeat's response
|
||||
while (callbackHandler.takeCompletedContainers() == null) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
asyncClient.stop();
|
||||
|
||||
Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
|
||||
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
|
||||
}
|
||||
|
||||
private AllocateResponse createAllocateResponse(
|
||||
List<ContainerStatus> completed, List<Container> allocated) {
|
||||
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
|
||||
new ArrayList<NodeReport>(), null, false, 1);
|
||||
return response;
|
||||
}
|
||||
|
||||
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||
private volatile List<ContainerStatus> completedContainers;
|
||||
private volatile List<Container> allocatedContainers;
|
||||
|
||||
public List<ContainerStatus> takeCompletedContainers() {
|
||||
List<ContainerStatus> ret = completedContainers;
|
||||
if (ret == null) {
|
||||
return null;
|
||||
}
|
||||
completedContainers = null;
|
||||
synchronized (ret) {
|
||||
ret.notify();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public List<Container> takeAllocatedContainers() {
|
||||
List<Container> ret = allocatedContainers;
|
||||
if (ret == null) {
|
||||
return null;
|
||||
}
|
||||
allocatedContainers = null;
|
||||
synchronized (ret) {
|
||||
ret.notify();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersCompleted(List<ContainerStatus> statuses) {
|
||||
completedContainers = statuses;
|
||||
// wait for containers to be taken before returning
|
||||
synchronized (completedContainers) {
|
||||
while (completedContainers != null) {
|
||||
try {
|
||||
completedContainers.wait();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Interrupted during wait", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersAllocated(List<Container> containers) {
|
||||
allocatedContainers = containers;
|
||||
// wait for containers to be taken before returning
|
||||
synchronized (allocatedContainers) {
|
||||
while (allocatedContainers != null) {
|
||||
try {
|
||||
allocatedContainers.wait();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Interrupted during wait", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRebootRequest() {}
|
||||
|
||||
@Override
|
||||
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -404,4 +405,21 @@ public static AllocateRequest newAllocateRequest(
|
||||
allocateRequest.addAllReleases(containersToBeReleased);
|
||||
return allocateRequest;
|
||||
}
|
||||
|
||||
public static AllocateResponse newAllocateResponse(int responseId,
|
||||
List<ContainerStatus> completedContainers,
|
||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||
Resource availResources, boolean reboot, int numClusterNodes) {
|
||||
AllocateResponse response = recordFactory
|
||||
.newRecordInstance(AllocateResponse.class);
|
||||
response.setNumClusterNodes(numClusterNodes);
|
||||
response.setResponseId(responseId);
|
||||
response.setCompletedContainersStatuses(completedContainers);
|
||||
response.setAllocatedContainers(allocatedContainers);
|
||||
response.setUpdatedNodes(updatedNodes);
|
||||
response.setAvailableResources(availResources);
|
||||
response.setReboot(reboot);
|
||||
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user