MAPREDUCE-2889. Added documentation for writing new YARN applications. Contributed by Hitesh Shah.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177920 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dc8464f943
commit
7aa8a3c9f8
@ -330,6 +330,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-3098. Fixed RM and MR AM to report YarnApplicationState and
|
||||
application's FinalStatus separately. (Hitesh Shah via vinodkv)
|
||||
|
||||
MAPREDUCE-2889. Added documentation for writing new YARN applications.
|
||||
(Hitesh Shah via acmurthy)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||
|
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
<project>
|
||||
<parent>
|
||||
<artifactId>hadoop-yarn</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<version>${yarn.version}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-site</artifactId>
|
||||
<name>hadoop-yarn-site</name>
|
||||
|
||||
<properties>
|
||||
<install.file>${project.artifact.file}</install.file>
|
||||
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
</project>
|
@ -0,0 +1,781 @@
|
||||
~~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~~ you may not use this file except in compliance with the License.
|
||||
~~ You may obtain a copy of the License at
|
||||
~~
|
||||
~~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~~
|
||||
~~ Unless required by applicable law or agreed to in writing, software
|
||||
~~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~~ See the License for the specific language governing permissions and
|
||||
~~ limitations under the License. See accompanying LICENSE file.
|
||||
|
||||
---
|
||||
Hadoop Map Reduce Next Generation-${project.version} - Writing YARN
|
||||
Applications
|
||||
---
|
||||
---
|
||||
${maven.build.timestamp}
|
||||
|
||||
Hadoop MapReduce Next Generation - Writing YARN Applications
|
||||
|
||||
\[ {{{./index.html}Go Back}} \]
|
||||
|
||||
%{toc|section=1|fromDepth=1}
|
||||
|
||||
* Purpose
|
||||
|
||||
This document describes, at a high-level, the way to implement new
|
||||
Applications for YARN.
|
||||
|
||||
* Concepts and Flow
|
||||
|
||||
The general concept is that an 'Application Submission Client' submits an
|
||||
'Application' to the YARN Resource Manager. The client communicates with the
|
||||
ResourceManager using the 'ClientRMProtocol' to first acquire a new
|
||||
'ApplicationId' if needed via ClientRMProtocol#getNewApplication and then
|
||||
submit the 'Application' to be run via ClientRMProtocol#submitApplication. As
|
||||
part of the ClientRMProtocol#submitApplication call, the client needs to
|
||||
provide sufficient information to the ResourceManager to 'launch' the
|
||||
application's first container i.e. the ApplicationMaster.
|
||||
You need to provide information such as the details about the local
|
||||
files/jars that need to be available for your application to run, the actual
|
||||
command that needs to be executed (with the necessary command line arguments),
|
||||
any Unix environment settings (optional), etc. Effectively, you need to
|
||||
describe the Unix process(es) that needs to be launched for your
|
||||
ApplicationMaster.
|
||||
|
||||
The YARN ResourceManager will then launch the ApplicationMaster (as specified)
|
||||
on an allocated container. The ApplicationMaster is then expected to
|
||||
communicate with the ResourceManager using the 'AMRMProtocol'. Firstly, the
|
||||
ApplicationMaster needs to register itself with the ResourceManager. To
|
||||
complete the task assigned to it, the ApplicationMaster can then request for
|
||||
and receive containers via AMRMProtocol#allocate. After a container is
|
||||
allocated to it, the ApplicationMaster communicates with the NodeManager using
|
||||
ContainerManager#startContainer to launch the container for its task. As part
|
||||
of launching this container, the ApplicationMaster has to specify the
|
||||
ContainerLaunchContext which, similar to the ApplicationSubmissionContext,
|
||||
has the launch information such as command line specification, environment,
|
||||
etc. Once the task is completed, the ApplicationMaster has to signal the
|
||||
ResourceManager of its completion via the AMRMProtocol#finishApplicationMaster.
|
||||
|
||||
Meanwhile, the client can monitor the application's status by querying the
|
||||
ResourceManager or by directly querying the ApplicationMaster if it supports
|
||||
such a service. If needed, it can also kill the application via
|
||||
ClientRMProtocol#forceKillApplication.
|
||||
|
||||
* Interfaces
|
||||
|
||||
The interfaces you'd most like be concerned with are:
|
||||
|
||||
* ClientRMProtocol - Client\<--\>ResourceManager\
|
||||
The protocol for a client that wishes to communicate with the
|
||||
ResourceManager to launch a new application (i.e. the ApplicationMaster),
|
||||
check on the status of the application or kill the application. For example,
|
||||
a job-client (a job launching program from the gateway) would use this
|
||||
protocol.
|
||||
|
||||
* AMRMProtocol - ApplicationMaster\<--\>ResourceManager\
|
||||
The protocol used by the ApplicationMaster to register/unregister itself
|
||||
to/from the ResourceManager as well as to request for resources from the
|
||||
Scheduler to complete its tasks.
|
||||
|
||||
* ContainerManager - ApplicationMaster\<--\>NodeManager\
|
||||
The protocol used by the ApplicationMaster to talk to the NodeManager to
|
||||
start/stop containers and get status updates on the containers if needed.
|
||||
|
||||
* Writing a Simple Yarn Application
|
||||
|
||||
** Writing a simple Client
|
||||
|
||||
* The first step that a client needs to do is to connect to the
|
||||
ResourceManager or to be more specific, the ApplicationsManager (AsM)
|
||||
interface of the ResourceManager.
|
||||
|
||||
+---+
|
||||
ClientRMProtocol applicationsManager;
|
||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||
InetSocketAddress rmAddress =
|
||||
NetUtils.createSocketAddr(yarnConf.get(
|
||||
YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
configuration appsManagerServerConf = new Configuration(conf);
|
||||
appsManagerServerConf.setClass(
|
||||
YarnConfiguration.YARN_SECURITY_INFO,
|
||||
ClientRMSecurityInfo.class, SecurityInfo.class);
|
||||
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
|
||||
ClientRMProtocol.class, rmAddress, appsManagerServerConf));
|
||||
+---+
|
||||
|
||||
* Once a handle is obtained to the ASM, the client needs to request the
|
||||
ResourceManager for a new ApplicationId.
|
||||
|
||||
+---+
|
||||
GetNewApplicationRequest request =
|
||||
Records.newRecord(GetNewApplicationRequest.class);
|
||||
GetNewApplicationResponse response =
|
||||
applicationsManager.getNewApplication(request);
|
||||
LOG.info("Got new ApplicationId=" + response.getApplicationId());
|
||||
+---+
|
||||
|
||||
* The response from the ASM for a new application also contains information
|
||||
about the cluster such as the minimum/maximum resource capabilities of the
|
||||
cluster. This is required so that to ensure that you can correctly set the
|
||||
specifications of the container in which the ApplicationMaster would be
|
||||
launched. Please refer to GetNewApplicationResponse for more details.
|
||||
|
||||
* The main crux of a client is to setup the ApplicationSubmissionContext
|
||||
which defines all the information needed by the ResourceManager to launch
|
||||
the ApplicationMaster. A client needs to set the following into the context:
|
||||
|
||||
* Application Info: id, name
|
||||
|
||||
* Queue, Priority info: Queue to which the application will be submitted,
|
||||
the priority to be assigned for the application.
|
||||
|
||||
* User: The user submitting the application
|
||||
|
||||
* ContainerLaunchContext: The information defining the container in which
|
||||
the ApplicationMaster will be launched and run. The
|
||||
ContainerLaunchContext, as mentioned previously, defines all the required
|
||||
information needed to run the ApplicationMaster such as the local
|
||||
resources (binaries, jars, files etc.), security tokens, environment
|
||||
settings (CLASSPATH etc.) and the command to be executed.
|
||||
|
||||
[]
|
||||
|
||||
+---+
|
||||
// Create a new ApplicationSubmissionContext
|
||||
ApplicationSubmissionContext appContext =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
// set the ApplicationId
|
||||
appContext.setApplicationId(appId);
|
||||
// set the application name
|
||||
appContext.setApplicationName(appName);
|
||||
|
||||
// Create a new container launch context for the AM's container
|
||||
ContainerLaunchContext amContainer =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
|
||||
// Define the local resources required
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
// Lets assume the jar we need for our ApplicationMaster is available in
|
||||
// HDFS at a certain known path to us and we want to make it available to
|
||||
// the ApplicationMaster in the launched container
|
||||
Path jarPath; // <- known path to jar file
|
||||
FileStatus jarStatus = fs.getFileStatus(jarPath);
|
||||
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
|
||||
// Set the type of resource - file or archive
|
||||
// archives are untarred at the destination by the framework
|
||||
amJarRsrc.setType(LocalResourceType.FILE);
|
||||
// Set visibility of the resource
|
||||
// Setting to most private option i.e. this file will only
|
||||
// be visible to this instance of the running application
|
||||
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
// Set the location of resource to be copied over into the
|
||||
// working directory
|
||||
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
|
||||
// Set timestamp and length of file so that the framework
|
||||
// can do basic sanity checks for the local resource
|
||||
// after it has been copied over to ensure it is the same
|
||||
// resource the client intended to use with the application
|
||||
amJarRsrc.setTimestamp(jarStatus.getModificationTime());
|
||||
amJarRsrc.setSize(jarStatus.getLen());
|
||||
// The framework will create a symlink called AppMaster.jar in the
|
||||
// working directory that will be linked back to the actual file.
|
||||
// The ApplicationMaster, if needs to reference the jar file, would
|
||||
// need to use the symlink filename.
|
||||
localResources.put("AppMaster.jar", amJarRsrc);
|
||||
// Set the local resources into the launch context
|
||||
amContainer.setLocalResources(localResources);
|
||||
|
||||
// Set up the environment needed for the launch context
|
||||
Map<String, String> env = new HashMap<String, String>();
|
||||
// For example, we could setup the classpath needed.
|
||||
// Assuming our classes or jars are available as local resources in the
|
||||
// working directory from which the command will be run, we need to append
|
||||
// "." to the path.
|
||||
// By default, all the hadoop specific classpaths will already be available
|
||||
// in $CLASSPATH, so we should be careful not to overwrite it.
|
||||
String classPathEnv = "$CLASSPATH:./*:";
|
||||
env.put("CLASSPATH", classPathEnv);
|
||||
amContainer.setEnvironment(env);
|
||||
|
||||
// Construct the command to be executed on the launched container
|
||||
String command =
|
||||
"${JAVA_HOME}" + /bin/java" +
|
||||
" MyAppMaster" +
|
||||
" arg1 arg2 arg3" +
|
||||
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
|
||||
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
|
||||
|
||||
List<String> commands = new ArrayList<String>();
|
||||
commands.add(command);
|
||||
// add additional commands if needed
|
||||
|
||||
// Set the command array into the container spec
|
||||
amContainer.setCommands(commands);
|
||||
|
||||
// Define the resource requirements for the container
|
||||
// For now, YARN only supports memory so we set the memory
|
||||
// requirements.
|
||||
// If the process takes more than its allocated memory, it will
|
||||
// be killed by the framework.
|
||||
// Memory being requested for should be less than max capability
|
||||
// of the cluster and all asks should be a multiple of the min capability.
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(amMemory);
|
||||
amContainer.setResource(capability);
|
||||
|
||||
// Set the container launch content into the ApplicationSubmissionContext
|
||||
appContext.setAMContainerSpec(amContainer);
|
||||
+---+
|
||||
|
||||
* After the setup process is complete, the client is finally ready to submit
|
||||
the application to the ASM.
|
||||
|
||||
+---+
|
||||
// Create the request to send to the ApplicationsManager
|
||||
SubmitApplicationRequest appRequest =
|
||||
Records.newRecord(SubmitApplicationRequest.class);
|
||||
appRequest.setApplicationSubmissionContext(appContext);
|
||||
|
||||
// Submit the application to the ApplicationsManager
|
||||
// Ignore the response as either a valid response object is returned on
|
||||
// success or an exception thrown to denote the failure
|
||||
applicationsManager.submitApplication(appRequest);
|
||||
+---+
|
||||
|
||||
* At this point, the ResourceManager will have accepted the application and
|
||||
in the background, will go through the process of allocating a container
|
||||
with the required specifications and then eventually setting up and
|
||||
launching the ApplicationMaster on the allocated container.
|
||||
|
||||
* There are multiple ways a client can track progress of the actual task.
|
||||
|
||||
* It can communicate with the ResourceManager and request for a report of
|
||||
the application via ClientRMProtocol#getApplicationReport.
|
||||
|
||||
+-----+
|
||||
GetApplicationReportRequest reportRequest =
|
||||
Records.newRecord(GetApplicationReportRequest.class);
|
||||
reportRequest.setApplicationId(appId);
|
||||
GetApplicationReportResponse reportResponse =
|
||||
applicationsManager.getApplicationReport(reportRequest);
|
||||
ApplicationReport report = reportResponse.getApplicationReport();
|
||||
+-----+
|
||||
|
||||
The ApplicationReport received from the ResourceManager consists of the following:
|
||||
|
||||
* General application information: ApplicationId, queue to which the
|
||||
application was submitted, user who submitted the application and the
|
||||
start time for the application.
|
||||
|
||||
* ApplicationMaster details: the host on which the ApplicationMaster is
|
||||
running, the rpc port (if any) on which it is listening for requests
|
||||
from clients and a token that the client needs to communicate with
|
||||
the ApplicationMaster.
|
||||
|
||||
* Application tracking information: If the application supports some
|
||||
form of progress tracking, it can set a tracking url which is
|
||||
available via ApplicationReport#getTrackingUrl that a client can look
|
||||
at to monitor progress.
|
||||
|
||||
* ApplicationStatus: The state of the application as seen by the
|
||||
ResourceManager is available via
|
||||
ApplicationReport#getYarnApplicationState. If the
|
||||
YarnApplicationState is set to FINISHED, the client should refer to
|
||||
ApplicationReport#getFinalApplicationStatus to check for the actual
|
||||
success/failure of the application task itself. In case of failures,
|
||||
ApplicationReport#getDiagnostics may be useful to shed some more
|
||||
light on the the failure.
|
||||
|
||||
* If the ApplicationMaster supports it, a client can directly query the
|
||||
ApplicationMaster itself for progress updates via the host:rpcport
|
||||
information obtained from the ApplicationReport. It can also use the
|
||||
tracking url obtained from the report if available.
|
||||
|
||||
* In certain situations, if the application is taking too long or due to
|
||||
other factors, the client may wish to kill the application. The
|
||||
ClientRMProtocol supports the forceKillApplication call that allows a
|
||||
client to send a kill signal to the ApplicationMaster via the
|
||||
ResourceManager. An ApplicationMaster if so designed may also support an
|
||||
abort call via its rpc layer that a client may be able to leverage.
|
||||
|
||||
+---+
|
||||
KillApplicationRequest killRequest =
|
||||
Records.newRecord(KillApplicationRequest.class);
|
||||
killRequest.setApplicationId(appId);
|
||||
applicationsManager.forceKillApplication(killRequest);
|
||||
+---+
|
||||
|
||||
** Writing an ApplicationMaster
|
||||
|
||||
* The ApplicationMaster is the actual owner of the job. It will be launched
|
||||
by the ResourceManager and via the client will be provided all the necessary
|
||||
information and resources about the job that it has been tasked with to
|
||||
oversee and complete.
|
||||
|
||||
* As the ApplicationMaster is launched within a container that may (likely
|
||||
will) be sharing a physical host with other containers, given the
|
||||
multi-tenancy nature, amongst other issues, it cannot make any assumptions
|
||||
of things like pre-configured ports that it can listen on.
|
||||
|
||||
* All interactions with the ResourceManager require an ApplicationAttemptId
|
||||
(there can be multiple attempts per application in case of failures). When
|
||||
the ApplicationMaster starts up, the ApplicationAttemptId associated with
|
||||
this particular instance will be set in the environment. There are helper
|
||||
apis to convert the value obtained from the environment into an
|
||||
ApplicationAttemptId object.
|
||||
|
||||
+---+
|
||||
Map<String, String> envs = System.getenv();
|
||||
ApplicationAttemptId appAttemptID =
|
||||
Records.newRecord(ApplicationAttemptId.class);
|
||||
if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
|
||||
// app attempt id should always be set in the env by the framework
|
||||
throw new IllegalArgumentException(
|
||||
"ApplicationAttemptId not set in the environment");
|
||||
}
|
||||
appAttemptID =
|
||||
ConverterUtils.toApplicationAttemptId(
|
||||
envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
|
||||
+---+
|
||||
|
||||
* After an ApplicationMaster has initialized itself completely, it needs to
|
||||
register with the ResourceManager via
|
||||
AMRMProtocol#registerApplicationMaster. The ApplicationMaster always
|
||||
communicate via the Scheduler interface of the ResourceManager.
|
||||
|
||||
+---+
|
||||
// Connect to the Scheduler of the ResourceManager.
|
||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||
InetSocketAddress rmAddress =
|
||||
NetUtils.createSocketAddr(yarnConf.get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
AMRMProtocol resourceManager =
|
||||
(AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
|
||||
|
||||
// Register the AM with the RM
|
||||
// Set the required info into the registration request:
|
||||
// ApplicationAttemptId,
|
||||
// host on which the app master is running
|
||||
// rpc port on which the app master accepts requests from the client
|
||||
// tracking url for the client to track app master progress
|
||||
RegisterApplicationMasterRequest appMasterRequest =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
appMasterRequest.setApplicationAttemptId(appAttemptID);
|
||||
appMasterRequest.setHost(appMasterHostname);
|
||||
appMasterRequest.setRpcPort(appMasterRpcPort);
|
||||
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
|
||||
|
||||
// The registration response is useful as it provides information about the
|
||||
// cluster.
|
||||
// Similar to the GetNewApplicationResponse in the client, it provides
|
||||
// information about the min/mx resource capabilities of the cluster that
|
||||
// would be needed by the ApplicationMaster when requesting for containers.
|
||||
RegisterApplicationMasterResponse response =
|
||||
resourceManager.registerApplicationMaster(appMasterRequest);
|
||||
+---+
|
||||
|
||||
* The ApplicationMaster has to emit heartbeats to the ResourceManager to keep
|
||||
it informed that the ApplicationMaster is alive and still running. The
|
||||
timeout expiry interval at the ResourceManager is defined by a config
|
||||
setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the
|
||||
default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS.
|
||||
The AMRMProtocol#allocate calls to the ResourceManager count as heartbeats
|
||||
as it also supports sending progress update information. Therefore, an
|
||||
allocate call with no containers requested and progress information updated
|
||||
if any is a valid way for making heartbeat calls to the ResourceManager.
|
||||
|
||||
* Based on the task requirements, the ApplicationMaster can ask for a set of
|
||||
containers to run its tasks on. The ApplicationMaster has to use the
|
||||
ResourceRequest class to define the following container specifications:
|
||||
|
||||
* Hostname: If containers are required to be hosted on a particular rack or
|
||||
a specific host. '*' is a special value that implies any host will do.
|
||||
|
||||
* Resource capability: Currently, YARN only supports memory based resource
|
||||
requirements so the request should define how much memory is needed. The
|
||||
value is defined in MB and has to less than the max capability of the
|
||||
cluster and an exact multiple of the min capability.
|
||||
|
||||
* Priority: When asking for sets of containers, an ApplicationMaster may
|
||||
define different priorities to each set. For example, the Map-Reduce
|
||||
ApplicationMaster may assign a higher priority to containers needed
|
||||
for the Map tasks and a lower priority for the Reduce tasks' containers.
|
||||
|
||||
[]
|
||||
|
||||
+----+
|
||||
// Resource Request
|
||||
ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
|
||||
|
||||
// setup requirements for hosts
|
||||
// whether a particular rack/host is needed
|
||||
// useful for applications that are sensitive
|
||||
// to data locality
|
||||
rsrcRequest.setHostName("*");
|
||||
|
||||
// set the priority for the request
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
pri.setPriority(requestPriority);
|
||||
rsrcRequest.setPriority(pri);
|
||||
|
||||
// Set up resource type requirements
|
||||
// For now, only memory is supported so we set memory requirements
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(containerMemory);
|
||||
rsrcRequest.setCapability(capability);
|
||||
|
||||
// set no. of containers needed
|
||||
// matching the specifications
|
||||
rsrcRequest.setNumContainers(numContainers);
|
||||
+---+
|
||||
|
||||
* After defining the container requirements, the ApplicationMaster has to
|
||||
construct an AllocateRequest to send to the ResourceManager.
|
||||
The AllocateRequest consists of:
|
||||
|
||||
* Requested containers: The container specifications and the no. of
|
||||
containers being requested for by the ApplicationMaster from the
|
||||
ResourceManager.
|
||||
|
||||
* Released containers: There may be situations when the ApplicationMaster
|
||||
may have requested for more containers that it needs or due to failure
|
||||
issues, decide to use other containers allocated to it. In all such
|
||||
situations, it is beneficial to the cluster if the ApplicationMaster
|
||||
releases these containers back to the ResourceManager so that they can be
|
||||
re-allocated to other applications.
|
||||
|
||||
* ResponseId: The response id that will be sent back in the response from
|
||||
the allocate call.
|
||||
|
||||
* Progress update information: The ApplicationMaster can send its progress
|
||||
update (range between to 0 to 1) to the ResourceManager.
|
||||
|
||||
[]
|
||||
|
||||
+---+
|
||||
List<ResourceRequest> requestedContainers;
|
||||
List<ContainerId> releasedContainers
|
||||
AllocateRequest req = Records.newRecord(AllocateRequest.class);
|
||||
|
||||
// The response id set in the request will be sent back in
|
||||
// the response so that the ApplicationMaster can
|
||||
// match it to its original ask and act appropriately.
|
||||
req.setResponseId(rmRequestID);
|
||||
|
||||
// Set ApplicationAttemptId
|
||||
req.setApplicationAttemptId(appAttemptID);
|
||||
|
||||
// Add the list of containers being asked for
|
||||
req.addAllAsks(requestedContainers);
|
||||
|
||||
// If the ApplicationMaster has no need for certain
|
||||
// containers due to over-allocation or for any other
|
||||
// reason, it can release them back to the ResourceManager
|
||||
req.addAllReleases(releasedContainers);
|
||||
|
||||
// Assuming the ApplicationMaster can track its progress
|
||||
req.setProgress(currentProgress);
|
||||
|
||||
AllocateResponse allocateResponse = resourceManager.allocate(req);
|
||||
+---+
|
||||
|
||||
* The AllocateResponse sent back from the ResourceManager provides the
|
||||
following information via the AMResponse object:
|
||||
|
||||
* Reboot flag: For scenarios when the ApplicationMaster may get out of sync
|
||||
with the ResourceManager.
|
||||
|
||||
* Allocated containers: The containers that have been allocated to the
|
||||
ApplicationMaster.
|
||||
|
||||
* Headroom: Headroom for resources in the cluster. Based on this information
|
||||
and knowing its needs, an ApplicationMaster can make intelligent decisions
|
||||
such as re-prioritizing sub-tasks to take advantage of currently allocated
|
||||
containers, bailing out faster if resources are not becoming available
|
||||
etc.
|
||||
|
||||
* Completed containers: Once an ApplicationMaster triggers a launch an
|
||||
allocated container, it will receive an update from the ResourceManager
|
||||
when the container completes. The ApplicationMaster can look into the
|
||||
status of the completed container and take appropriate actions such as
|
||||
re-trying a particular sub-task in case of a failure.
|
||||
|
||||
[]
|
||||
|
||||
One thing to note is that containers will not be immediately allocated to
|
||||
the ApplicationMaster. This does not imply that the ApplicationMaster should
|
||||
keep on asking the pending count of required containers. Once an allocate
|
||||
request has been sent, the ApplicationMaster will eventually be allocated
|
||||
the containers based on cluster capacity, priorities and the scheduling
|
||||
policy in place. The ApplicationMaster should only request for containers
|
||||
again if and only if its original estimate changed and it needs additional
|
||||
containers.
|
||||
|
||||
+---+
|
||||
// Get AMResponse from AllocateResponse
|
||||
AMResponse amResp = allocateResponse.getAMResponse();
|
||||
|
||||
// Retrieve list of allocated containers from the response
|
||||
// and on each allocated container, lets assume we are launching
|
||||
// the same job.
|
||||
List<Container> allocatedContainers = amResp.getAllocatedContainers();
|
||||
for (Container allocatedContainer : allocatedContainers) {
|
||||
LOG.info("Launching shell command on a new container."
|
||||
+ ", containerId=" + allocatedContainer.getId()
|
||||
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
|
||||
+ ":" + allocatedContainer.getNodeId().getPort()
|
||||
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
||||
+ ", containerState" + allocatedContainer.getState()
|
||||
+ ", containerResourceMemory"
|
||||
+ allocatedContainer.getResource().getMemory());
|
||||
|
||||
|
||||
// Launch and start the container on a separate thread to keep the main
|
||||
// thread unblocked as all containers may not be allocated at one go.
|
||||
LaunchContainerRunnable runnableLaunchContainer =
|
||||
new LaunchContainerRunnable(allocatedContainer);
|
||||
Thread launchThread = new Thread(runnableLaunchContainer);
|
||||
launchThreads.add(launchThread);
|
||||
launchThread.start();
|
||||
}
|
||||
|
||||
// Check what the current available resources in the cluster are
|
||||
Resource availableResources = amResp.getAvailableResources();
|
||||
// Based on this information, an ApplicationMaster can make appropriate
|
||||
// decisions
|
||||
|
||||
// Check the completed containers
|
||||
// Let's assume we are keeping a count of total completed containers,
|
||||
// containers that failed and ones that completed successfully.
|
||||
List<ContainerStatus> completedContainers =
|
||||
amResp.getCompletedContainersStatuses();
|
||||
for (ContainerStatus containerStatus : completedContainers) {
|
||||
LOG.info("Got container status for containerID= "
|
||||
+ containerStatus.getContainerId()
|
||||
+ ", state=" + containerStatus.getState()
|
||||
+ ", exitStatus=" + containerStatus.getExitStatus()
|
||||
+ ", diagnostics=" + containerStatus.getDiagnostics());
|
||||
|
||||
int exitStatus = containerStatus.getExitStatus();
|
||||
if (0 != exitStatus) {
|
||||
// container failed
|
||||
// -100 is a special case where the container
|
||||
// was aborted/pre-empted for some reason
|
||||
if (-100 != exitStatus) {
|
||||
// application job on container returned a non-zero exit code
|
||||
// counts as completed
|
||||
numCompletedContainers.incrementAndGet();
|
||||
numFailedContainers.incrementAndGet();
|
||||
}
|
||||
else {
|
||||
// something else bad happened
|
||||
// app job did not complete for some reason
|
||||
// we should re-try as the container was lost for some reason
|
||||
// decrementing the requested count so that we ask for an
|
||||
// additional one in the next allocate call.
|
||||
numRequestedContainers.decrementAndGet();
|
||||
// we do not need to release the container as that has already
|
||||
// been done by the ResourceManager/NodeManager.
|
||||
}
|
||||
}
|
||||
else {
|
||||
// nothing to do
|
||||
// container completed successfully
|
||||
numCompletedContainers.incrementAndGet();
|
||||
numSuccessfulContainers.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
+---+
|
||||
|
||||
|
||||
* After a container has been allocated to the ApplicationMaster, it needs to
|
||||
follow a similar process that the Client followed in setting up the
|
||||
ContainerLaunchContext for the eventual task that is going to be running on
|
||||
the allocated Container. Once the ContainerLaunchContext is defined, the
|
||||
ApplicationMaster can then communicate with the ContainerManager to start
|
||||
its allocated container.
|
||||
|
||||
+---+
|
||||
|
||||
//Assuming an allocated Container obtained from AMResponse
|
||||
Container container;
|
||||
// Connect to ContainerManager on the allocated container
|
||||
String cmIpPortStr = container.getNodeId().getHost() + ":"
|
||||
+ container.getNodeId().getPort();
|
||||
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
|
||||
ContainerManager cm =
|
||||
(ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);
|
||||
|
||||
// Now we setup a ContainerLaunchContext
|
||||
ContainerLaunchContext ctx =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
|
||||
ctx.setContainerId(container.getId());
|
||||
ctx.setResource(container.getResource());
|
||||
|
||||
try {
|
||||
ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
} catch (IOException e) {
|
||||
LOG.info(
|
||||
"Getting current user failed when trying to launch the container",
|
||||
+ e.getMessage());
|
||||
}
|
||||
|
||||
// Set the environment
|
||||
Map<String, String> unixEnv;
|
||||
// Setup the required env.
|
||||
// Please note that the launched container does not inherit
|
||||
// the environment of the ApplicationMaster so all the
|
||||
// necessary environment settings will need to be re-setup
|
||||
// for this allocated container.
|
||||
ctx.setEnvironment(unixEnv);
|
||||
|
||||
// Set the local resources
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
// Again, the local resources from the ApplicationMaster is not copied over
|
||||
// by default to the allocated container. Thus, it is the responsibility
|
||||
// of the ApplicationMaster to setup all the necessary local resources
|
||||
// needed by the job that will be executed on the allocated container.
|
||||
|
||||
// Assume that we are executing a shell script on the allocated container
|
||||
// and the shell script's location in the filesystem is known to us.
|
||||
Path shellScriptPath;
|
||||
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
|
||||
shellRsrc.setType(LocalResourceType.FILE);
|
||||
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
shellRsrc.setResource(
|
||||
ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
|
||||
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
||||
shellRsrc.setSize(shellScriptPathLen);
|
||||
localResources.put("MyExecShell.sh", shellRsrc);
|
||||
|
||||
ctx.setLocalResources(localResources);
|
||||
|
||||
// Set the necessary command to execute on the allocated container
|
||||
String command = "/bin/sh ./MyExecShell.sh"
|
||||
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
|
||||
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
|
||||
|
||||
List<String> commands = new ArrayList<String>();
|
||||
commands.add(command);
|
||||
ctx.setCommands(commands);
|
||||
|
||||
// Send the start request to the ContainerManager
|
||||
StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
|
||||
startReq.setContainerLaunchContext(ctx);
|
||||
cm.startContainer(startReq);
|
||||
+---+
|
||||
|
||||
* The ApplicationMaster, as mentioned previously, will get updates of
|
||||
completed containers as part of the response from the AMRMProtocol#allocate
|
||||
calls. It can also monitor its launched containers pro-actively by querying
|
||||
the ContainerManager for the status.
|
||||
|
||||
+---+
|
||||
|
||||
GetContainerStatusRequest statusReq =
|
||||
Records.newRecord(GetContainerStatusRequest.class);
|
||||
statusReq.setContainerId(container.getId());
|
||||
GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
|
||||
LOG.info("Container Status"
|
||||
+ ", id=" + container.getId()
|
||||
+ ", status=" + statusResp.getStatus());
|
||||
+---+
|
||||
|
||||
~~** Defining the context in which your code runs
|
||||
|
||||
~~*** Container Resource Requests
|
||||
|
||||
~~*** Local Resources
|
||||
|
||||
~~*** Environment
|
||||
|
||||
~~**** Managing the CLASSPATH
|
||||
|
||||
~~** Security
|
||||
|
||||
* FAQ
|
||||
|
||||
** How can I distribute my application's jars to all of the nodes in the YARN
|
||||
cluster that need it?
|
||||
|
||||
You can use the LocalResource to add resources to your application request.
|
||||
This will cause YARN to distribute the resource to the ApplicationMaster node.
|
||||
If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all
|
||||
you need to do is add the unzipped folder to your classpath.
|
||||
For example, when creating your application request:
|
||||
|
||||
+---+
|
||||
File packageFile = new File(packagePath);
|
||||
Url packageUrl = ConverterUtils.getYarnUrlFromPath(
|
||||
FileContext.getFileContext.makeQualified(new Path(packagePath)));
|
||||
|
||||
packageResource.setResource(packageUrl);
|
||||
packageResource.setSize(packageFile.length());
|
||||
packageResource.setTimestamp(packageFile.lastModified());
|
||||
packageResource.setType(LocalResourceType.ARCHIVE);
|
||||
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
|
||||
resource.setMemory(memory)
|
||||
containerCtx.setResource(resource)
|
||||
containerCtx.setCommands(ImmutableList.of(
|
||||
"java -cp './package/*' some.class.to.Run "
|
||||
+ "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
|
||||
+ "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
|
||||
containerCtx.setLocalResources(
|
||||
Collections.singletonMap("package", packageResource))
|
||||
appCtx.setApplicationId(appId)
|
||||
appCtx.setUser(user.getShortUserName)
|
||||
appCtx.setAMContainerSpec(containerCtx)
|
||||
request.setApplicationSubmissionContext(appCtx)
|
||||
applicationsManager.submitApplication(request)
|
||||
+---+
|
||||
|
||||
As you can see, the setLocalResources command takes a map of names to
|
||||
resources. The name becomes a sym link in your application's cwd, so you can
|
||||
just refer to the artifacts inside by using ./package/*.
|
||||
|
||||
Note: Java's classpath (cp) argument is VERY sensitive.
|
||||
Make sure you get the syntax EXACTLY correct.
|
||||
|
||||
Once your package is distributed to your ApplicationMaster, you'll need to
|
||||
follow the same process whenever your ApplicationMaster starts a new container
|
||||
(assuming you want the resources to be sent to your container). The code for
|
||||
this is the same. You just need to make sure that you give your
|
||||
ApplicationMaster the package path (either HDFS, or local), so that it can
|
||||
send the resource URL along with the container ctx.
|
||||
|
||||
** How do I get the ApplicationMaster's ApplicationAttemptId?
|
||||
|
||||
|
||||
The ApplicationAttemptId will be passed to the ApplicationMaster via the
|
||||
environment and the value from the environment can be converted into an
|
||||
ApplicationAttemptId object via the ConverterUtils helper function.
|
||||
|
||||
** My container is being killed by the Node Manager
|
||||
|
||||
This is likely due to high memory usage exceeding your requested container
|
||||
memory size. There are a number of reasons that can cause this. First, look
|
||||
at the process tree that the node manager dumps when it kills your container.
|
||||
The two things you're interested in are physical memory and virtual memory.
|
||||
If you have exceeded physical memory limits your app is using too much physical
|
||||
memory. If you're running a Java app, you can use -hprof to look at what is
|
||||
taking up space in the heap. If you have exceeded virtual memory, things are
|
||||
slightly more complicated.
|
||||
|
||||
* Useful Links
|
||||
|
||||
* {{{https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf}Map Reduce Next Generation Architecture}}
|
||||
|
||||
* {{{http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/}Map Reduce Next Generation Scheduler}}
|
||||
|
@ -0,0 +1,47 @@
|
||||
~~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~~ you may not use this file except in compliance with the License.
|
||||
~~ You may obtain a copy of the License at
|
||||
~~
|
||||
~~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~~
|
||||
~~ Unless required by applicable law or agreed to in writing, software
|
||||
~~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~~ See the License for the specific language governing permissions and
|
||||
~~ limitations under the License. See accompanying LICENSE file.
|
||||
|
||||
---
|
||||
Hadoop MapReduce Next Generation ${project.version}
|
||||
---
|
||||
---
|
||||
${maven.build.timestamp}
|
||||
|
||||
Hadoop MapReduce Next Generation
|
||||
|
||||
* Architecture
|
||||
|
||||
The new architecture introduced in hadoop-0.23, divides the two major
|
||||
functions of the JobTracker: resource management and job life-cycle management
|
||||
into separate components.
|
||||
|
||||
The new ResourceManager manages the global assignment of compute resources to
|
||||
applications and the per-application ApplicationMaster manages the
|
||||
application’s scheduling and coordination.
|
||||
|
||||
An application is either a single job in the sense of classic MapReduce jobs
|
||||
or a DAG of such jobs.
|
||||
|
||||
The ResourceManager and per-machine NodeManager daemon, which manages the
|
||||
user processes on that machine, form the computation fabric. The
|
||||
per-application ApplicationMaster is, in effect, a framework specific library
|
||||
and is tasked with negotiating resources from the ResourceManager and working
|
||||
with the NodeManager(s) to execute and monitor the tasks.
|
||||
|
||||
* User Documentation
|
||||
|
||||
* {{{./SingleCluster.html}Setting up a Single Node Cluster}}
|
||||
|
||||
* {{{./apidocs/index.html}JavaDocs}}
|
||||
|
||||
* {{{./WritingYarnApplications.html}Writing Yarn Applications}}
|
||||
|
@ -425,5 +425,6 @@
|
||||
<module>hadoop-yarn-common</module>
|
||||
<module>hadoop-yarn-server</module>
|
||||
<module>hadoop-yarn-applications</module>
|
||||
<module>hadoop-yarn-site</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
@ -1,39 +0,0 @@
|
||||
~~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~~ you may not use this file except in compliance with the License.
|
||||
~~ You may obtain a copy of the License at
|
||||
~~
|
||||
~~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~~
|
||||
~~ Unless required by applicable law or agreed to in writing, software
|
||||
~~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~~ See the License for the specific language governing permissions and
|
||||
~~ limitations under the License. See accompanying LICENSE file.
|
||||
|
||||
---
|
||||
Hadoop MapReduce Next Generation ${project.version}
|
||||
---
|
||||
---
|
||||
${maven.build.timestamp}
|
||||
|
||||
Hadoop MapReduce Next Generation
|
||||
|
||||
* Architecture
|
||||
|
||||
The new architecture introduced in 0.23, divides the two major functions
|
||||
of the JobTracker, resource management and job scheduling/monitoring, into separate
|
||||
components.
|
||||
The new ResourceManager manages the global assignment of compute resources to applications
|
||||
and the per-application ApplicationMaster manages the application’s scheduling and coordination.
|
||||
An application is either a single job in the classic MapReduce jobs or a DAG of such jobs.
|
||||
The ResourceManager and per-machine NodeManager server, which manages the user processes on that
|
||||
machine, form the computation fabric. The per-application ApplicationMaster is, in effect, a
|
||||
framework specific library and is tasked with negotiating resources from the ResourceManager
|
||||
and working with the NodeManager(s) to execute and monitor the tasks.
|
||||
|
||||
* User Documentation
|
||||
|
||||
* {{{./SingleCluster.html}SingleCluster}}
|
||||
|
||||
* {{{./apidocs/index.html}JavaDocs}}
|
||||
|
Loading…
Reference in New Issue
Block a user