From 88b82a0f6687ce103817fbb460fd30d870f717a0 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Wed, 14 Sep 2011 07:26:37 +0000 Subject: [PATCH] MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a ContainerLaunchContext (Arun Murthy via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170459 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 119 +--- .../hadoop/mapreduce/v2/util/MRApps.java | 129 +++++ .../org/apache/hadoop/mapred/YARNRunner.java | 223 ++------ .../records/ApplicationSubmissionContext.java | 218 +------- .../api/records/ContainerLaunchContext.java | 4 +- .../ApplicationSubmissionContextPBImpl.java | 512 ++---------------- .../impl/pb/ContainerLaunchContextPBImpl.java | 35 +- .../src/main/proto/yarn_protos.proto | 18 +- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../launcher/ContainerLaunch.java | 6 +- .../resourcemanager/ClientRMService.java | 13 +- .../server/resourcemanager/RMAppManager.java | 34 +- .../RMAppManagerSubmitEvent.java | 1 - .../amlauncher/AMLauncher.java | 48 +- .../resourcemanager/rmapp/RMAppImpl.java | 6 +- .../rmapp/attempt/RMAppAttemptImpl.java | 3 +- .../resourcemanager/webapp/AppsBlock.java | 4 +- .../yarn/server/resourcemanager/MockRM.java | 9 +- .../resourcemanager/TestAppManager.java | 40 +- .../rmapp/TestRMAppTransitions.java | 2 +- .../TestContainerTokenSecretManager.java | 20 +- 22 files changed, 373 insertions(+), 1080 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f33c179694..0b1dbb9885 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -289,6 +289,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via mahadev) + MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a + ContainerLaunchContext (Arun Murthy via mahadev) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3d4dcb5ed0..17cef5a26a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -579,13 +579,12 @@ private ContainerLaunchContext createContainerLaunchContext() { + remoteJobConfPath.toUri().toASCIIString()); // //////////// End of JobConf setup - // Setup DistributedCache - setupDistributedCache(remoteFS, conf, localResources, environment); + MRApps.setupDistributedCache(conf, localResources, environment); // Set local-resources and environment container.setLocalResources(localResources); - container.setEnv(environment); + container.setEnvironment(environment); // Setup up tokens Credentials taskCredentials = new Credentials(); @@ -618,7 +617,7 @@ private ContainerLaunchContext createContainerLaunchContext() { ShuffleHandler.serializeServiceData(jobToken)); container.setServiceData(serviceData); - MRApps.addToClassPath(container.getEnv(), getInitialClasspath()); + MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath()); } catch (IOException e) { throw new YarnException(e); } @@ -645,7 +644,7 @@ private ContainerLaunchContext createContainerLaunchContext() { taskAttemptListener.getAddress(), remoteTask, javaHome, workDir.toString(), containerLogDir, childTmpDir, jvmID)); - MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths, + MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths, workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask, localizedApplicationTokensFile); @@ -656,116 +655,6 @@ private ContainerLaunchContext createContainerLaunchContext() { return container; } - private static long[] parseTimeStamps(String[] strs) { - if (null == strs) { - return null; - } - long[] result = new long[strs.length]; - for(int i=0; i < strs.length; ++i) { - result[i] = Long.parseLong(strs[i]); - } - return result; - } - - private void setupDistributedCache(FileSystem remoteFS, - Configuration conf, - Map localResources, - Map env) - throws IOException { - - // Cache archives - parseDistributedCacheArtifacts(remoteFS, localResources, env, - LocalResourceType.ARCHIVE, - DistributedCache.getCacheArchives(conf), - parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), - getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), - DistributedCache.getArchiveVisibilities(conf), - DistributedCache.getArchiveClassPaths(conf)); - - // Cache files - parseDistributedCacheArtifacts(remoteFS, - localResources, env, - LocalResourceType.FILE, - DistributedCache.getCacheFiles(conf), - parseTimeStamps(DistributedCache.getFileTimestamps(conf)), - getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), - DistributedCache.getFileVisibilities(conf), - DistributedCache.getFileClassPaths(conf)); - } - - // TODO - Move this to MR! - // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], - // long[], boolean[], Path[], FileType) - private void parseDistributedCacheArtifacts( - FileSystem remoteFS, - Map localResources, - Map env, - LocalResourceType type, - URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], - Path[] pathsToPutOnClasspath) throws IOException { - - if (uris != null) { - // Sanity check - if ((uris.length != timestamps.length) || (uris.length != sizes.length) || - (uris.length != visibilities.length)) { - throw new IllegalArgumentException("Invalid specification for " + - "distributed-cache artifacts of type " + type + " :" + - " #uris=" + uris.length + - " #timestamps=" + timestamps.length + - " #visibilities=" + visibilities.length - ); - } - - Map classPaths = new HashMap(); - if (pathsToPutOnClasspath != null) { - for (Path p : pathsToPutOnClasspath) { - p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory())); - classPaths.put(p.toUri().getPath().toString(), p); - } - } - for (int i = 0; i < uris.length; ++i) { - URI u = uris[i]; - Path p = new Path(u); - p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory())); - // Add URI fragment or just the filename - Path name = new Path((null == u.getFragment()) - ? p.getName() - : u.getFragment()); - if (name.isAbsolute()) { - throw new IllegalArgumentException("Resource name must be relative"); - } - String linkName = name.toUri().getPath(); - localResources.put( - linkName, - BuilderUtils.newLocalResource( - p.toUri(), type, - visibilities[i] - ? LocalResourceVisibility.PUBLIC - : LocalResourceVisibility.PRIVATE, - sizes[i], timestamps[i]) - ); - if (classPaths.containsKey(u.getPath())) { - MRApps.addToClassPath(env, linkName); - } - } - } - } - - // TODO - Move this to MR! - private static long[] getFileSizes(Configuration conf, String key) { - String[] strs = conf.getStrings(key); - if (strs == null) { - return null; - } - long[] result = new long[strs.length]; - for(int i=0; i < strs.length; ++i) { - result[i] = Long.parseLong(strs[i]); - } - return result; - } - @Override public ContainerId getAssignedContainerID() { readLock.lock(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 5dfa1dcfe4..68499497ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -25,14 +25,20 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.URI; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -42,12 +48,18 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.util.BuilderUtils; /** * Helper class for MR applications */ +@Private +@Unstable public class MRApps extends Apps { public static final String JOB = "job"; public static final String TASK = "task"; @@ -232,4 +244,121 @@ public static String getJobFile(Configuration conf, String user, jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE); return jobFile.toString(); } + + + + private static long[] parseTimeStamps(String[] strs) { + if (null == strs) { + return null; + } + long[] result = new long[strs.length]; + for(int i=0; i < strs.length; ++i) { + result[i] = Long.parseLong(strs[i]); + } + return result; + } + + public static void setupDistributedCache( + Configuration conf, + Map localResources, + Map env) + throws IOException { + + // Cache archives + parseDistributedCacheArtifacts(conf, localResources, env, + LocalResourceType.ARCHIVE, + DistributedCache.getCacheArchives(conf), + parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), + getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), + DistributedCache.getArchiveVisibilities(conf), + DistributedCache.getArchiveClassPaths(conf)); + + // Cache files + parseDistributedCacheArtifacts(conf, + localResources, env, + LocalResourceType.FILE, + DistributedCache.getCacheFiles(conf), + parseTimeStamps(DistributedCache.getFileTimestamps(conf)), + getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), + DistributedCache.getFileVisibilities(conf), + DistributedCache.getFileClassPaths(conf)); + } + + // TODO - Move this to MR! + // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], + // long[], boolean[], Path[], FileType) + private static void parseDistributedCacheArtifacts( + Configuration conf, + Map localResources, + Map env, + LocalResourceType type, + URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], + Path[] pathsToPutOnClasspath) throws IOException { + + if (uris != null) { + // Sanity check + if ((uris.length != timestamps.length) || (uris.length != sizes.length) || + (uris.length != visibilities.length)) { + throw new IllegalArgumentException("Invalid specification for " + + "distributed-cache artifacts of type " + type + " :" + + " #uris=" + uris.length + + " #timestamps=" + timestamps.length + + " #visibilities=" + visibilities.length + ); + } + + Map classPaths = new HashMap(); + if (pathsToPutOnClasspath != null) { + for (Path p : pathsToPutOnClasspath) { + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + classPaths.put(p.toUri().getPath().toString(), p); + } + } + for (int i = 0; i < uris.length; ++i) { + URI u = uris[i]; + Path p = new Path(u); + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + // Add URI fragment or just the filename + Path name = new Path((null == u.getFragment()) + ? p.getName() + : u.getFragment()); + if (name.isAbsolute()) { + throw new IllegalArgumentException("Resource name must be relative"); + } + String linkName = name.toUri().getPath(); + localResources.put( + linkName, + BuilderUtils.newLocalResource( + p.toUri(), type, + visibilities[i] + ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.PRIVATE, + sizes[i], timestamps[i]) + ); + if (classPaths.containsKey(u.getPath())) { + MRApps.addToClassPath(env, linkName); + } + } + } + } + + // TODO - Move this to MR! + private static long[] getFileSizes(Configuration conf, String key) { + String[] strs = conf.getStrings(key); + if (strs == null) { + return null; + } + long[] result = new long[strs.length]; + for(int i=0; i < strs.length; ++i) { + result[i] = Long.parseLong(strs[i]); + } + return result; + } + + + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index fa167a0acf..3751646010 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -33,7 +32,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; @@ -55,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.MRConstants; @@ -72,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -237,7 +235,6 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); - setupDistributedCache(conf, appContext); // XXX Remove in.close(); @@ -273,16 +270,18 @@ private LocalResource createApplicationResource(FileContext fs, Path p) public ApplicationSubmissionContext createApplicationSubmissionContext( Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException { - ApplicationSubmissionContext appContext = - recordFactory.newRecordInstance(ApplicationSubmissionContext.class); ApplicationId applicationId = resMgrDelegate.getApplicationId(); - appContext.setApplicationId(applicationId); + + // Setup resource requirements Resource capability = recordFactory.newRecordInstance(Resource.class); capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB)); LOG.info("AppMaster capability = " + capability); - appContext.setMasterCapability(capability); + // Setup LocalResources + Map localResources = + new HashMap(); + Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE); URL yarnUrlForJobSubmitDir = ConverterUtils @@ -292,14 +291,11 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( LOG.debug("Creating setup context, jobSubmitDir url is " + yarnUrlForJobSubmitDir); - appContext.setResource(MRConstants.JOB_SUBMIT_DIR, - yarnUrlForJobSubmitDir); - - appContext.setResourceTodo(MRConstants.JOB_CONF_FILE, + localResources.put(MRConstants.JOB_CONF_FILE, createApplicationResource(defaultFileContext, jobConfPath)); if (jobConf.get(MRJobConfig.JAR) != null) { - appContext.setResourceTodo(MRConstants.JOB_JAR, + localResources.put(MRConstants.JOB_JAR, createApplicationResource(defaultFileContext, new Path(jobSubmitDir, MRConstants.JOB_JAR))); } else { @@ -312,30 +308,21 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( // TODO gross hack for (String s : new String[] { "job.split", "job.splitmetainfo", MRConstants.APPLICATION_TOKENS_FILE }) { - appContext.setResourceTodo( + localResources.put( MRConstants.JOB_SUBMIT_DIR + "/" + s, - createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s))); - } - - // TODO: Only if security is on. - List fsTokens = new ArrayList(); - for (Token token : ts.getAllTokens()) { - fsTokens.add(token.encodeToUrlString()); + createApplicationResource(defaultFileContext, + new Path(jobSubmitDir, s))); } - // TODO - Remove this! - appContext.addAllFsTokens(fsTokens); - DataOutputBuffer dob = new DataOutputBuffer(); - ts.writeTokenStorageToStream(dob); - appContext.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + // Setup security tokens + ByteBuffer securityTokens = null; + if (UserGroupInformation.isSecurityEnabled()) { + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } - // Add queue information - appContext.setQueue(jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME)); - - // Add job name - appContext.setApplicationName(jobConf.get(JobContext.JOB_NAME, "N/A")); - - // Add the command line + // Setup the command to run the AM String javaHome = "$JAVA_HOME"; Vector vargs = new Vector(8); vargs.add(javaHome + "/bin/java"); @@ -346,13 +333,6 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS)); - // Add { job jar, MR app jar } to classpath. - Map environment = new HashMap(); - MRApps.setInitialClasspath(environment); - MRApps.addToClassPath(environment, MRConstants.JOB_JAR); - MRApps.addToClassPath(environment, - MRConstants.YARN_MAPREDUCE_APP_JAR_PATH); - appContext.addAllEnvironment(environment); vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster"); vargs.add(String.valueOf(applicationId.getClusterTimestamp())); vargs.add(String.valueOf(applicationId.getId())); @@ -370,140 +350,43 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( LOG.info("Command to launch container for ApplicationMaster is : " + mergedCommand); + + // Setup the environment - Add { job jar, MR app jar } to classpath. + Map environment = new HashMap(); + MRApps.setInitialClasspath(environment); + MRApps.addToClassPath(environment, MRConstants.JOB_JAR); + MRApps.addToClassPath(environment, + MRConstants.YARN_MAPREDUCE_APP_JAR_PATH); + + // Parse distributed cache + MRApps.setupDistributedCache(jobConf, localResources, environment); + + // Setup ContainerLaunchContext for AM container + ContainerLaunchContext amContainer = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + amContainer.setResource(capability); // Resource (mem) required + amContainer.setLocalResources(localResources); // Local resources + amContainer.setEnvironment(environment); // Environment + amContainer.setCommands(vargsFinal); // Command for AM + amContainer.setContainerTokens(securityTokens); // Security tokens + + // Set up the ApplicationSubmissionContext + ApplicationSubmissionContext appContext = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + appContext.setApplicationId(applicationId); // ApplicationId + appContext.setUser( // User name + UserGroupInformation.getCurrentUser().getShortUserName()); + appContext.setQueue( // Queue name + jobConf.get(JobContext.QUEUE_NAME, + YarnConfiguration.DEFAULT_QUEUE_NAME)); + appContext.setApplicationName( // Job name + jobConf.get(JobContext.JOB_NAME, + YarnConfiguration.DEFAULT_APPLICATION_NAME)); + appContext.setAMContainerSpec(amContainer); // AM Container - appContext.addAllCommands(vargsFinal); - // TODO: RM should get this from RPC. - appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); return appContext; } - /** - * * TODO: Copied for now from TaskAttemptImpl.java ... fixme - * @param strs - * @return - */ - private static long[] parseTimeStamps(String[] strs) { - if (null == strs) { - return null; - } - long[] result = new long[strs.length]; - for(int i=0; i < strs.length; ++i) { - result[i] = Long.parseLong(strs[i]); - } - return result; - } - - /** - * TODO: Copied for now from TaskAttemptImpl.java ... fixme - * - * TODO: This is currently needed in YarnRunner as user code like setupJob, - * cleanupJob may need access to dist-cache. Once we separate distcache for - * maps, reduces, setup etc, this can include only a subset of artificats. - * This is also needed for uberAM case where we run everything inside AM. - */ - private void setupDistributedCache(Configuration conf, - ApplicationSubmissionContext container) throws IOException { - - // Cache archives - parseDistributedCacheArtifacts(conf, container, LocalResourceType.ARCHIVE, - DistributedCache.getCacheArchives(conf), - parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), - getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), - DistributedCache.getArchiveVisibilities(conf), - DistributedCache.getArchiveClassPaths(conf)); - - // Cache files - parseDistributedCacheArtifacts(conf, container, LocalResourceType.FILE, - DistributedCache.getCacheFiles(conf), - parseTimeStamps(DistributedCache.getFileTimestamps(conf)), - getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), - DistributedCache.getFileVisibilities(conf), - DistributedCache.getFileClassPaths(conf)); - } - - // TODO - Move this to MR! - // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType) - private void parseDistributedCacheArtifacts(Configuration conf, - ApplicationSubmissionContext container, LocalResourceType type, - URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], - Path[] pathsToPutOnClasspath) throws IOException { - - if (uris != null) { - // Sanity check - if ((uris.length != timestamps.length) || (uris.length != sizes.length) || - (uris.length != visibilities.length)) { - throw new IllegalArgumentException("Invalid specification for " + - "distributed-cache artifacts of type " + type + " :" + - " #uris=" + uris.length + - " #timestamps=" + timestamps.length + - " #visibilities=" + visibilities.length - ); - } - - Map classPaths = new HashMap(); - if (pathsToPutOnClasspath != null) { - for (Path p : pathsToPutOnClasspath) { - FileSystem fs = p.getFileSystem(conf); - p = p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - classPaths.put(p.toUri().getPath().toString(), p); - } - } - for (int i = 0; i < uris.length; ++i) { - URI u = uris[i]; - Path p = new Path(u); - FileSystem fs = p.getFileSystem(conf); - p = fs.resolvePath( - p.makeQualified(fs.getUri(), fs.getWorkingDirectory())); - // Add URI fragment or just the filename - Path name = new Path((null == u.getFragment()) - ? p.getName() - : u.getFragment()); - if (name.isAbsolute()) { - throw new IllegalArgumentException("Resource name must be relative"); - } - String linkName = name.toUri().getPath(); - container.setResourceTodo( - linkName, - createLocalResource( - p.toUri(), type, - visibilities[i] - ? LocalResourceVisibility.PUBLIC - : LocalResourceVisibility.PRIVATE, - sizes[i], timestamps[i]) - ); - if (classPaths.containsKey(u.getPath())) { - Map environment = container.getAllEnvironment(); - MRApps.addToClassPath(environment, linkName); - } - } - } - } - - // TODO - Move this to MR! - private static long[] getFileSizes(Configuration conf, String key) { - String[] strs = conf.getStrings(key); - if (strs == null) { - return null; - } - long[] result = new long[strs.length]; - for(int i=0; i < strs.length; ++i) { - result[i] = Long.parseLong(strs[i]); - } - return result; - } - - private LocalResource createLocalResource(URI uri, - LocalResourceType type, LocalResourceVisibility visibility, - long size, long timestamp) throws IOException { - LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class); - resource.setResource(ConverterUtils.getYarnUrlFromURI(uri)); - resource.setType(type); - resource.setVisibility(visibility); - resource.setSize(size); - resource.setTimestamp(timestamp); - return resource; - } - @Override public void setJobPriority(JobID arg0, String arg1) throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 46511ca0d2..0f1243fd9f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -18,14 +18,8 @@ package org.apache.hadoop.yarn.api.records; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ClientRMProtocol; /** @@ -36,26 +30,17 @@ *

It includes details such as: *

    *
  • {@link ApplicationId} of the application.
  • - *
  • - * {@link Resource} necessary to run the ApplicationMaster. - *
  • *
  • Application user.
  • *
  • Application name.
  • *
  • {@link Priority} of the application.
  • - *
  • Security tokens (if security is enabled).
  • *
  • - * {@link LocalResource} necessary for running the - * ApplicationMaster container such - * as binaries, jar, shared-objects, side-files etc. + * {@link ContainerLaunchContext} of the container in which the + * ApplicationMaster is executed. *
  • - *
  • - * Environment variables for the launched ApplicationMaster - * process. - *
  • - *
  • Command to launch the ApplicationMaster.
  • *
*

* + * @see ContainerLaunchContext * @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest) */ @Public @@ -143,198 +128,25 @@ public interface ApplicationSubmissionContext { public void setUser(String user); /** - * Get the Resource required to run the - * ApplicationMaster. - * @return Resource required to run the - * ApplicationMaster + * Get the ContainerLaunchContext to describe the + * Container with which the ApplicationMaster is + * launched. + * @return ContainerLaunchContext for the + * ApplicationMaster container */ @Public @Stable - public Resource getMasterCapability(); + public ContainerLaunchContext getAMContainerSpec(); /** - * Set Resource required to run the - * ApplicationMaster. - * @param masterCapability Resource required to run the - * ApplicationMaster + * Set the ContainerLaunchContext to describe the + * Container with which the ApplicationMaster is + * launched. + * @param amContainer ContainerLaunchContext for the + * ApplicationMaster container */ @Public @Stable - public void setMasterCapability(Resource masterCapability); - - @Private - @Unstable - public Map getAllResources(); - - @Private - @Unstable - public URL getResource(String key); - - @Private - @Unstable - public void addAllResources(Map resources); + public void setAMContainerSpec(ContainerLaunchContext amContainer); - @Private - @Unstable - public void setResource(String key, URL url); - - @Private - @Unstable - public void removeResource(String key); - - @Private - @Unstable - public void clearResources(); - - /** - * Get all the LocalResource required to run the - * ApplicationMaster. - * @return LocalResource required to run the - * ApplicationMaster - */ - @Public - @Stable - public Map getAllResourcesTodo(); - - @Private - @Unstable - public LocalResource getResourceTodo(String key); - - /** - * Add all the LocalResource required to run the - * ApplicationMaster. - * @param resources all LocalResource required to run the - * ApplicationMaster - */ - @Public - @Stable - public void addAllResourcesTodo(Map resources); - - @Private - @Unstable - public void setResourceTodo(String key, LocalResource localResource); - - @Private - @Unstable - public void removeResourceTodo(String key); - - @Private - @Unstable - public void clearResourcesTodo(); - - @Private - @Unstable - public List getFsTokenList(); - - @Private - @Unstable - public String getFsToken(int index); - - @Private - @Unstable - public int getFsTokenCount(); - - @Private - @Unstable - public void addAllFsTokens(List fsTokens); - - @Private - @Unstable - public void addFsToken(String fsToken); - - @Private - @Unstable - public void removeFsToken(int index); - - @Private - @Unstable - public void clearFsTokens(); - - /** - * Get file-system tokens for the ApplicationMaster. - * @return file-system tokens for the ApplicationMaster - */ - @Public - @Stable - public ByteBuffer getFsTokensTodo(); - - /** - * Set file-system tokens for the ApplicationMaster. - * @param fsTokens file-system tokens for the ApplicationMaster - */ - @Public - @Stable - public void setFsTokensTodo(ByteBuffer fsTokens); - - /** - * Get the environment variables for the - * ApplicationMaster. - * @return environment variables for the ApplicationMaster - */ - @Public - @Stable - public Map getAllEnvironment(); - - @Private - @Unstable - public String getEnvironment(String key); - - /** - * Add all of the environment variables for the - * ApplicationMaster. - * @param environment environment variables for the - * ApplicationMaster - */ - @Public - @Stable - public void addAllEnvironment(Map environment); - - @Private - @Unstable - public void setEnvironment(String key, String env); - - @Private - @Unstable - public void removeEnvironment(String key); - - @Private - @Unstable - public void clearEnvironment(); - - /** - * Get the commands to launch the ApplicationMaster. - * @return commands to launch the ApplicationMaster - */ - @Public - @Stable - public List getCommandList(); - - @Private - @Unstable - public String getCommand(int index); - - @Private - @Unstable - public int getCommandCount(); - - /** - * Add all of the commands to launch the - * ApplicationMaster. - * @param commands commands to launch the ApplicationMaster - */ - @Public - @Stable - public void addAllCommands(List commands); - - @Private - @Unstable - public void addCommand(String command); - - @Private - @Unstable - public void removeCommand(int index); - - @Private - @Unstable - public void clearCommands(); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 0339df9af1..52452b54e1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -156,7 +156,7 @@ public interface ContainerLaunchContext { */ @Public @Stable - Map getEnv(); + Map getEnvironment(); /** * Add environment variables for the container. @@ -164,7 +164,7 @@ public interface ContainerLaunchContext { */ @Public @Stable - void setEnv(Map environment); + void setEnvironment(Map environment); /** * Get the list of commands for launching the container. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 2b4841888a..1f8b5c24b1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -18,56 +18,35 @@ package org.apache.hadoop.yarn.api.records.impl.pb; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; -import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; -import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; -import org.apache.hadoop.yarn.proto.YarnProtos.StringURLMapProto; -import org.apache.hadoop.yarn.proto.YarnProtos.URLProto; - - -public class ApplicationSubmissionContextPBImpl extends ProtoBase implements ApplicationSubmissionContext { - ApplicationSubmissionContextProto proto = ApplicationSubmissionContextProto.getDefaultInstance(); +public class ApplicationSubmissionContextPBImpl +extends ProtoBase +implements ApplicationSubmissionContext { + ApplicationSubmissionContextProto proto = + ApplicationSubmissionContextProto.getDefaultInstance(); ApplicationSubmissionContextProto.Builder builder = null; boolean viaProto = false; private ApplicationId applicationId = null; - private Resource masterCapability = null; - private Map resources = null; - private Map resourcesTodo = null; - private List fsTokenList = null; - private ByteBuffer fsTokenTodo = null; - private Map environment = null; - private List commandList = null; private Priority priority = null; - - + private ContainerLaunchContext amContainer = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); } - public ApplicationSubmissionContextPBImpl(ApplicationSubmissionContextProto proto) { + public ApplicationSubmissionContextPBImpl( + ApplicationSubmissionContextProto proto) { this.proto = proto; viaProto = true; } @@ -83,30 +62,12 @@ private void mergeLocalToBuilder() { if (this.applicationId != null) { builder.setApplicationId(convertToProtoFormat(this.applicationId)); } - if (this.masterCapability != null) { - builder.setMasterCapability(convertToProtoFormat(this.masterCapability)); - } - if (this.resources != null) { - addResourcesToProto(); - } - if (this.resourcesTodo != null) { - addResourcesTodoToProto(); - } - if (this.fsTokenList != null) { - addFsTokenListToProto(); - } - if (this.fsTokenTodo != null) { - builder.setFsTokensTodo(convertToProtoFormat(this.fsTokenTodo)); - } - if (this.environment != null) { - addEnvironmentToProto(); - } - if (this.commandList != null) { - addCommandsToProto(); - } if (this.priority != null) { builder.setPriority(convertToProtoFormat(this.priority)); } + if (this.amContainer != null) { + builder.setAmContainerSpec(convertToProtoFormat(this.amContainer)); + } } private void mergeLocalToProto() { @@ -145,6 +106,7 @@ public void setPriority(Priority priority) { builder.clearPriority(); this.priority = priority; } + @Override public ApplicationId getApplicationId() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; @@ -165,6 +127,7 @@ public void setApplicationId(ApplicationId applicationId) { builder.clearApplicationId(); this.applicationId = applicationId; } + @Override public String getApplicationName() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; @@ -183,403 +146,7 @@ public void setApplicationName(String applicationName) { } builder.setApplicationName((applicationName)); } - @Override - public Resource getMasterCapability() { - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - if (this.masterCapability != null) { - return masterCapability; - } // Else via proto - if (!p.hasMasterCapability()) { - return null; - } - masterCapability = convertFromProtoFormat(p.getMasterCapability()); - return this.masterCapability; - } - @Override - public void setMasterCapability(Resource masterCapability) { - maybeInitBuilder(); - if (masterCapability == null) - builder.clearMasterCapability(); - this.masterCapability = masterCapability; - } - @Override - public Map getAllResources() { - initResources(); - return this.resources; - } - @Override - public URL getResource(String key) { - initResources(); - return this.resources.get(key); - } - - private void initResources() { - if (this.resources != null) { - return; - } - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - List mapAsList = p.getResourcesList(); - this.resources = new HashMap(); - - for (StringURLMapProto c : mapAsList) { - this.resources.put(c.getKey(), convertFromProtoFormat(c.getValue())); - } - } - - @Override - public void addAllResources(final Map resources) { - if (resources == null) - return; - initResources(); - this.resources.putAll(resources); - } - - private void addResourcesToProto() { - maybeInitBuilder(); - builder.clearResources(); - if (this.resources == null) - return; - Iterable iterable = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - - Iterator keyIter = resources.keySet().iterator(); - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public StringURLMapProto next() { - String key = keyIter.next(); - return StringURLMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resources.get(key))).build(); - } - - @Override - public boolean hasNext() { - return keyIter.hasNext(); - } - }; - } - }; - builder.addAllResources(iterable); - } - @Override - public void setResource(String key, URL val) { - initResources(); - this.resources.put(key, val); - } - @Override - public void removeResource(String key) { - initResources(); - this.resources.remove(key); - } - @Override - public void clearResources() { - initResources(); - this.resources.clear(); - } - @Override - public Map getAllResourcesTodo() { - initResourcesTodo(); - return this.resourcesTodo; - } - @Override - public LocalResource getResourceTodo(String key) { - initResourcesTodo(); - return this.resourcesTodo.get(key); - } - - private void initResourcesTodo() { - if (this.resourcesTodo != null) { - return; - } - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - List mapAsList = p.getResourcesTodoList(); - this.resourcesTodo = new HashMap(); - - for (StringLocalResourceMapProto c : mapAsList) { - this.resourcesTodo.put(c.getKey(), convertFromProtoFormat(c.getValue())); - } - } - - @Override - public void addAllResourcesTodo(final Map resourcesTodo) { - if (resourcesTodo == null) - return; - initResourcesTodo(); - this.resourcesTodo.putAll(resourcesTodo); - } - - private void addResourcesTodoToProto() { - maybeInitBuilder(); - builder.clearResourcesTodo(); - if (resourcesTodo == null) - return; - Iterable iterable = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - - Iterator keyIter = resourcesTodo.keySet().iterator(); - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public StringLocalResourceMapProto next() { - String key = keyIter.next(); - return StringLocalResourceMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resourcesTodo.get(key))).build(); - } - - @Override - public boolean hasNext() { - return keyIter.hasNext(); - } - }; - } - }; - builder.addAllResourcesTodo(iterable); - } - @Override - public void setResourceTodo(String key, LocalResource val) { - initResourcesTodo(); - this.resourcesTodo.put(key, val); - } - @Override - public void removeResourceTodo(String key) { - initResourcesTodo(); - this.resourcesTodo.remove(key); - } - @Override - public void clearResourcesTodo() { - initResourcesTodo(); - this.resourcesTodo.clear(); - } - @Override - public List getFsTokenList() { - initFsTokenList(); - return this.fsTokenList; - } - @Override - public String getFsToken(int index) { - initFsTokenList(); - return this.fsTokenList.get(index); - } - @Override - public int getFsTokenCount() { - initFsTokenList(); - return this.fsTokenList.size(); - } - - private void initFsTokenList() { - if (this.fsTokenList != null) { - return; - } - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getFsTokensList(); - this.fsTokenList = new ArrayList(); - - for (String c : list) { - this.fsTokenList.add(c); - } - } - - @Override - public void addAllFsTokens(final List fsTokens) { - if (fsTokens == null) - return; - initFsTokenList(); - this.fsTokenList.addAll(fsTokens); - } - - private void addFsTokenListToProto() { - maybeInitBuilder(); - builder.clearFsTokens(); - builder.addAllFsTokens(this.fsTokenList); - } - - @Override - public void addFsToken(String fsTokens) { - initFsTokenList(); - this.fsTokenList.add(fsTokens); - } - @Override - public void removeFsToken(int index) { - initFsTokenList(); - this.fsTokenList.remove(index); - } - @Override - public void clearFsTokens() { - initFsTokenList(); - this.fsTokenList.clear(); - } - @Override - public ByteBuffer getFsTokensTodo() { - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - if (this.fsTokenTodo != null) { - return this.fsTokenTodo; - } - if (!p.hasFsTokensTodo()) { - return null; - } - this.fsTokenTodo = convertFromProtoFormat(p.getFsTokensTodo()); - return this.fsTokenTodo; - } - - @Override - public void setFsTokensTodo(ByteBuffer fsTokensTodo) { - maybeInitBuilder(); - if (fsTokensTodo == null) - builder.clearFsTokensTodo(); - this.fsTokenTodo = fsTokensTodo; - } - @Override - public Map getAllEnvironment() { - initEnvironment(); - return this.environment; - } - @Override - public String getEnvironment(String key) { - initEnvironment(); - return this.environment.get(key); - } - - private void initEnvironment() { - if (this.environment != null) { - return; - } - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - List mapAsList = p.getEnvironmentList(); - this.environment = new HashMap(); - - for (StringStringMapProto c : mapAsList) { - this.environment.put(c.getKey(), c.getValue()); - } - } - - @Override - public void addAllEnvironment(Map environment) { - if (environment == null) - return; - initEnvironment(); - this.environment.putAll(environment); - } - - private void addEnvironmentToProto() { - maybeInitBuilder(); - builder.clearEnvironment(); - if (environment == null) - return; - Iterable iterable = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - - Iterator keyIter = environment.keySet().iterator(); - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public StringStringMapProto next() { - String key = keyIter.next(); - return StringStringMapProto.newBuilder().setKey(key).setValue((environment.get(key))).build(); - } - - @Override - public boolean hasNext() { - return keyIter.hasNext(); - } - }; - } - }; - builder.addAllEnvironment(iterable); - } - @Override - public void setEnvironment(String key, String val) { - initEnvironment(); - this.environment.put(key, val); - } - @Override - public void removeEnvironment(String key) { - initEnvironment(); - this.environment.remove(key); - } - @Override - public void clearEnvironment() { - initEnvironment(); - this.environment.clear(); - } - @Override - public List getCommandList() { - initCommandList(); - return this.commandList; - } - @Override - public String getCommand(int index) { - initCommandList(); - return this.commandList.get(index); - } - @Override - public int getCommandCount() { - initCommandList(); - return this.commandList.size(); - } - - private void initCommandList() { - if (this.commandList != null) { - return; - } - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getCommandList(); - this.commandList = new ArrayList(); - - for (String c : list) { - this.commandList.add(c); - } - } - - @Override - public void addAllCommands(final List command) { - if (command == null) - return; - initCommandList(); - this.commandList.addAll(command); - } - - private void addCommandsToProto() { - maybeInitBuilder(); - builder.clearCommand(); - if (this.commandList == null) - return; - builder.addAllCommand(this.commandList); - } - @Override - public void addCommand(String command) { - initCommandList(); - this.commandList.add(command); - } - @Override - public void removeCommand(int index) { - initCommandList(); - this.commandList.remove(index); - } - @Override - public void clearCommands() { - initCommandList(); - this.commandList.clear(); - } @Override public String getQueue() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; @@ -598,6 +165,7 @@ public void setQueue(String queue) { } builder.setQueue((queue)); } + @Override public String getUser() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; @@ -617,6 +185,28 @@ public void setUser(String user) { builder.setUser((user)); } + @Override + public ContainerLaunchContext getAMContainerSpec() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.amContainer != null) { + return amContainer; + } // Else via proto + if (!p.hasAmContainerSpec()) { + return null; + } + amContainer = convertFromProtoFormat(p.getAmContainerSpec()); + return amContainer; + } + + @Override + public void setAMContainerSpec(ContainerLaunchContext amContainer) { + maybeInitBuilder(); + if (amContainer == null) { + builder.clearAmContainerSpec(); + } + this.amContainer = amContainer; + } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); } @@ -633,28 +223,12 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId t) { return ((ApplicationIdPBImpl)t).getProto(); } - private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { - return new ResourcePBImpl(p); + private ContainerLaunchContextPBImpl convertFromProtoFormat( + ContainerLaunchContextProto p) { + return new ContainerLaunchContextPBImpl(p); } - private ResourceProto convertToProtoFormat(Resource t) { - return ((ResourcePBImpl)t).getProto(); + private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) { + return ((ContainerLaunchContextPBImpl)t).getProto(); } - - private URLPBImpl convertFromProtoFormat(URLProto p) { - return new URLPBImpl(p); - } - - private URLProto convertToProtoFormat(URL t) { - return ((URLPBImpl)t).getProto(); - } - - private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { - return new LocalResourcePBImpl(p); - } - - private LocalResourceProto convertToProtoFormat(LocalResource t) { - return ((LocalResourcePBImpl)t).getProto(); - } - } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 0696d8327b..de292ad98e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -39,8 +39,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; - - public class ContainerLaunchContextPBImpl extends ProtoBase implements ContainerLaunchContext { @@ -54,10 +52,9 @@ public class ContainerLaunchContextPBImpl private Map localResources = null; private ByteBuffer containerTokens = null; private Map serviceData = null; - private Map env = null; + private Map environment = null; private List commands = null; - public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); } @@ -94,7 +91,7 @@ private void mergeLocalToBuilder() { if (this.serviceData != null) { addServiceDataToProto(); } - if (this.env != null) { + if (this.environment != null) { addEnvToProto(); } if (this.commands != null) { @@ -364,37 +361,37 @@ public boolean hasNext() { } @Override - public Map getEnv() { + public Map getEnvironment() { initEnv(); - return this.env; + return this.environment; } private void initEnv() { - if (this.env != null) { + if (this.environment != null) { return; } ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getEnvList(); - this.env = new HashMap(); + List list = p.getEnvironmentList(); + this.environment = new HashMap(); for (StringStringMapProto c : list) { - this.env.put(c.getKey(), c.getValue()); + this.environment.put(c.getKey(), c.getValue()); } } @Override - public void setEnv(final Map env) { + public void setEnvironment(final Map env) { if (env == null) return; initEnv(); - this.env.clear(); - this.env.putAll(env); + this.environment.clear(); + this.environment.putAll(env); } private void addEnvToProto() { maybeInitBuilder(); - builder.clearEnv(); - if (env == null) + builder.clearEnvironment(); + if (environment == null) return; Iterable iterable = new Iterable() { @@ -403,7 +400,7 @@ private void addEnvToProto() { public Iterator iterator() { return new Iterator() { - Iterator keyIter = env.keySet().iterator(); + Iterator keyIter = environment.keySet().iterator(); @Override public void remove() { @@ -414,7 +411,7 @@ public void remove() { public StringStringMapProto next() { String key = keyIter.next(); return StringStringMapProto.newBuilder().setKey(key).setValue( - (env.get(key))).build(); + (environment.get(key))).build(); } @Override @@ -424,7 +421,7 @@ public boolean hasNext() { }; } }; - builder.addAllEnv(iterable); + builder.addAllEnvironment(iterable); } private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 61e3d1f5b9..cdcd1a747b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -188,17 +188,11 @@ message AMResponseProto { //////////////////////////////////////////////////////////////////////// message ApplicationSubmissionContextProto { optional ApplicationIdProto application_id = 1; - optional string application_name = 2; - optional ResourceProto master_capability = 3; - repeated StringURLMapProto resources = 4; - repeated StringLocalResourceMapProto resources_todo = 5; - repeated string fs_tokens = 6; - optional bytes fs_tokens_todo = 7; - repeated StringStringMapProto environment = 8; - repeated string command = 9; - optional string queue = 10; - optional PriorityProto priority = 11; - optional string user = 12; + optional string application_name = 2 [default = "N/A"]; + optional string user = 3; + optional string queue = 4 [default = "default"]; + optional PriorityProto priority = 5; + optional ContainerLaunchContextProto am_container_spec = 6; } message YarnClusterMetricsProto { @@ -242,7 +236,7 @@ message ContainerLaunchContextProto { repeated StringLocalResourceMapProto localResources = 4; optional bytes container_tokens = 5; repeated StringBytesMapProto service_data = 6; - repeated StringStringMapProto env = 7; + repeated StringStringMapProto environment = 7; repeated string command = 8; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2169ee3e90..ba23134170 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -219,6 +219,12 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "max-completed-applications"; public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000; + /** Default application name */ + public static final String DEFAULT_APPLICATION_NAME = "N/A"; + + /** Default queue name */ + public static final String DEFAULT_QUEUE_NAME = "default"; + //////////////////////////////// // Node Manager Configs //////////////////////////////// diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 1a34247c30..497460d3e7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -89,7 +89,7 @@ public Integer call() { final Map localResources = container.getLocalizedResources(); String containerIdStr = ConverterUtils.toString(container.getContainerID()); final String user = launchContext.getUser(); - final Map env = launchContext.getEnv(); + final Map env = launchContext.getEnvironment(); final List command = launchContext.getCommands(); int ret = -1; @@ -109,7 +109,7 @@ public Integer call() { } launchContext.setCommands(newCmds); - Map envs = launchContext.getEnv(); + Map envs = launchContext.getEnvironment(); Map newEnvs = new HashMap(envs.size()); for (Entry entry : envs.entrySet()) { newEnvs.put( @@ -118,7 +118,7 @@ public Integer call() { ApplicationConstants.LOG_DIR_EXPANSION_VAR, containerLogDir.toUri().getPath())); } - launchContext.setEnv(newEnvs); + launchContext.setEnvironment(newEnvs); // /////////////////////////// End of variable expansion FileContext lfs = FileContext.getLocalFSFileContext(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 593d6525a6..a31bef8af9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.service.AbstractService; @@ -90,7 +89,6 @@ public class ClientRMService extends AbstractService implements final private AtomicInteger applicationCounter = new AtomicInteger(0); final private YarnScheduler scheduler; final private RMContext rmContext; - private final AMLivelinessMonitor amLivelinessMonitor; private final RMAppManager rmAppManager; private String clientServiceBindAddress; @@ -106,7 +104,6 @@ public ClientRMService(RMContext rmContext, YarnScheduler scheduler, super(ClientRMService.class.getName()); this.scheduler = scheduler; this.rmContext = rmContext; - this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rmAppManager = rmAppManager; } @@ -195,15 +192,18 @@ public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnRemoteException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); - ApplicationId applicationId = null; - String user = null; + ApplicationId applicationId = submissionContext.getApplicationId(); + String user = submissionContext.getUser(); try { user = UserGroupInformation.getCurrentUser().getShortUserName(); - applicationId = submissionContext.getApplicationId(); if (rmContext.getRMApps().get(applicationId) != null) { throw new IOException("Application with id " + applicationId + " is already present! Cannot add a duplicate!"); } + + // Safety + submissionContext.setUser(user); + // This needs to be synchronous as the client can query // immediately following the submission to get the application status. // So call handle directly and do not send an event. @@ -226,6 +226,7 @@ public SubmitApplicationResponse submitApplication( return response; } + @SuppressWarnings("unchecked") @Override public FinishApplicationResponse finishApplication( FinishApplicationRequest request) throws YarnRemoteException { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 9a86dfd457..d0cd0a7ff8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -210,7 +210,9 @@ protected synchronized void checkAppNumCompletedLimit() { } } - protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) { + @SuppressWarnings("unchecked") + protected synchronized void submitApplication( + ApplicationSubmissionContext submissionContext) { ApplicationId applicationId = submissionContext.getApplicationId(); RMApp application = null; try { @@ -224,27 +226,37 @@ protected synchronized void submitApplication(ApplicationSubmissionContext submi clientTokenStr = clientToken.encodeToUrlString(); LOG.debug("Sending client token as " + clientTokenStr); } - submissionContext.setQueue(submissionContext.getQueue() == null - ? "default" : submissionContext.getQueue()); - submissionContext.setApplicationName(submissionContext - .getApplicationName() == null ? "N/A" : submissionContext - .getApplicationName()); + + // Sanity checks + if (submissionContext.getQueue() == null) { + submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + } + if (submissionContext.getApplicationName() == null) { + submissionContext.setApplicationName( + YarnConfiguration.DEFAULT_APPLICATION_NAME); + } + + // Store application for recovery ApplicationStore appStore = rmContext.getApplicationsStore() .createApplicationStore(submissionContext.getApplicationId(), submissionContext); + + // Create RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, clientTokenStr, - appStore, rmContext.getAMLivelinessMonitor(), this.scheduler, + appStore, this.scheduler, this.masterService); - if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { + if (rmContext.getRMApps().putIfAbsent(applicationId, application) != + null) { LOG.info("Application with id " + applicationId + " is already present! Cannot add a duplicate!"); - // don't send event through dispatcher as it will be handled by app already - // present with this id. + // don't send event through dispatcher as it will be handled by app + // already present with this id. application.handle(new RMAppRejectedEvent(applicationId, - "Application with this id is already present! Cannot add a duplicate!")); + "Application with this id is already present! " + + "Cannot add a duplicate!")); } else { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java index 99b3d77fd4..495e784428 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; public class RMAppManagerSubmitEvent extends RMAppManagerEvent { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 1a10993bb0..b394faa85d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -120,7 +119,8 @@ private void launch() throws IOException { + " for AM " + application.getAppAttemptId()); ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID); - StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); + StartContainerRequest request = + recordFactory.newRecordInstance(StartContainerRequest.class); request.setContainerLaunchContext(launchContext); containerMgrProxy.startContainer(request); LOG.info("Done launching container " + application.getMasterContainer() @@ -130,7 +130,8 @@ private void launch() throws IOException { private void cleanup() throws IOException { connect(); ContainerId containerId = application.getMasterContainer().getId(); - StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); + StopContainerRequest stopRequest = + recordFactory.newRecordInstance(StopContainerRequest.class); stopRequest.setContainerId(containerId); containerMgrProxy.stopContainer(stopRequest); } @@ -145,7 +146,7 @@ private ContainerManager getContainerMgrProxy( final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. UserGroupInformation currentUser = - UserGroupInformation.createRemoteUser("TODO"); // TODO + UserGroupInformation.createRemoteUser("yarn"); // TODO if (UserGroupInformation.isSecurityEnabled()) { ContainerToken containerToken = container.getContainerToken(); Token token = @@ -170,8 +171,8 @@ private ContainerLaunchContext createAMContainerLaunchContext( ContainerId containerID) throws IOException { // Construct the actual Container - ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class); - container.setCommands(applicationMasterContext.getCommandList()); + ContainerLaunchContext container = + applicationMasterContext.getAMContainerSpec(); StringBuilder mergedCommand = new StringBuilder(); String failCount = Integer.toString(application.getAppAttemptId() .getAttemptId()); @@ -189,34 +190,28 @@ private ContainerLaunchContext createAMContainerLaunchContext( LOG.info("Command to launch container " + containerID + " : " + mergedCommand); - Map environment = - applicationMasterContext.getAllEnvironment(); - environment.putAll(setupTokensInEnv(applicationMasterContext)); - container.setEnv(environment); - - // Construct the actual Container + + // Finalize the container container.setContainerId(containerID); container.setUser(applicationMasterContext.getUser()); - container.setResource(applicationMasterContext.getMasterCapability()); - container.setLocalResources(applicationMasterContext.getAllResourcesTodo()); - container.setContainerTokens(applicationMasterContext.getFsTokensTodo()); + setupTokensAndEnv(container); + return container; } - private Map setupTokensInEnv( - ApplicationSubmissionContext asc) + private void setupTokensAndEnv( + ContainerLaunchContext container) throws IOException { - Map env = - new HashMap(); + Map environment = container.getEnvironment(); if (UserGroupInformation.isSecurityEnabled()) { // TODO: Security enabled/disabled info should come from RM. Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); - if (asc.getFsTokensTodo() != null) { + if (container.getContainerTokens() != null) { // TODO: Don't do this kind of checks everywhere. - dibb.reset(asc.getFsTokensTodo()); + dibb.reset(container.getContainerTokens()); credentials.readTokenStorageStream(dibb); } @@ -236,14 +231,16 @@ private Map setupTokensInEnv( token.setService(new Text(resolvedAddr)); String appMasterTokenEncoded = token.encodeToUrlString(); LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded); - env.put(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME, + environment.put( + ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME, appMasterTokenEncoded); // Add the RM token credentials.addToken(new Text(resolvedAddr), token); DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); - asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + container.setContainerTokens( + ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier( application.getAppAttemptId().getApplicationId()); @@ -252,9 +249,10 @@ private Map setupTokensInEnv( String encoded = Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded()); LOG.debug("The encoded client secret-key to be put in env : " + encoded); - env.put(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded); + environment.put( + ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, + encoded); } - return env; } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 015c76163e..65ee9945e2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -86,7 +86,6 @@ public class RMAppImpl implements RMApp { // Mutable fields private long startTime; private long finishTime; - private AMLivelinessMonitor amLivelinessMonitor; private RMAppAttempt currentAttempt; private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); @@ -163,7 +162,7 @@ RMAppEventType.KILL, new AppKilledTransition()) public RMAppImpl(ApplicationId applicationId, RMContext rmContext, Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, String clientTokenStr, - ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor, + ApplicationStore appStore, YarnScheduler scheduler, ApplicationMasterService masterService) { this.applicationId = applicationId; @@ -176,7 +175,6 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.submissionContext = submissionContext; this.clientTokenStr = clientTokenStr; this.appStore = appStore; - this.amLivelinessMonitor = amLivelinessMonitor; this.scheduler = scheduler; this.masterService = masterService; this.startTime = System.currentTimeMillis(); @@ -380,6 +378,7 @@ public void handle(RMAppEvent event) { } } + @SuppressWarnings("unchecked") private void createNewAttempt() { ApplicationAttemptId appAttemptId = Records .newRecord(ApplicationAttemptId.class); @@ -434,6 +433,7 @@ private Set getNodesOnWhichAttemptRan(RMAppImpl app) { return nodes; } + @SuppressWarnings("unchecked") public void transition(RMAppImpl app, RMAppEvent event) { Set nodes = getNodesOnWhichAttemptRan(app); for (NodeId nodeId : nodes) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 6daff1d88e..12eca4d82f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -84,6 +84,7 @@ public class RMAppAttemptImpl implements RMAppAttempt { RMAppAttemptEvent> stateMachine; private final RMContext rmContext; + @SuppressWarnings("rawtypes") private final EventHandler eventHandler; private final YarnScheduler scheduler; private final ApplicationMasterService masterService; @@ -459,7 +460,7 @@ public void transition(RMAppAttemptImpl appAttempt, // Request a container for the AM. ResourceRequest request = BuilderUtils.newResourceRequest( AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext - .getMasterCapability(), 1); + .getAMContainerSpec().getResource(), 1); LOG.debug("About to request resources for AM of " + appAttempt.applicationAttemptId + " required " + request); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java index 94649923cb..afdec298a1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; @@ -59,7 +58,8 @@ class AppsBlock extends HtmlBlock { String appId = app.getApplicationId().toString(); String trackingUrl = app.getTrackingUrl(); String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" : - (app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory"); + (app.getFinishTime() == 0 ? + "ApplicationMaster URL" : "JobHistory URL"); String percent = String.format("%.1f", app.getProgress() * 100); tbody. tr(). diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 901948fab7..4be2739967 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -81,13 +82,17 @@ public RMApp submitApp(int masterMemory) throws Exception { ApplicationId appId = resp.getApplicationId(); SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class); - ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class); + ApplicationSubmissionContext sub = + Records.newRecord(ApplicationSubmissionContext.class); sub.setApplicationId(appId); sub.setApplicationName(""); sub.setUser(""); + ContainerLaunchContext clc = + Records.newRecord(ContainerLaunchContext.class); Resource capability = Records.newRecord(Resource.class); capability.setMemory(masterMemory); - sub.setMasterCapability(capability); + clc.setResource(capability); + sub.setAMContainerSpec(clc); req.setApplicationSubmissionContext(sub); client.submitApplication(req); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index bd66a6337f..afdeb16177 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -18,19 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static org.mockito.Mockito.*; -import java.util.ArrayList; import java.util.List; -import java.util.LinkedList; -import java.util.Map; import java.util.concurrent.ConcurrentMap; - import junit.framework.Assert; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -44,7 +37,6 @@ import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; @@ -63,8 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.service.Service; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import com.google.common.collect.Maps; import com.google.common.collect.Lists; @@ -75,7 +65,6 @@ */ public class TestAppManager{ - private static final Log LOG = LogFactory.getLog(TestAppManager.class); private static RMAppEventType appEventType = RMAppEventType.KILL; public synchronized RMAppEventType getAppEventType() { @@ -117,10 +106,8 @@ public ConcurrentMap getRMApps() { public class TestAppManagerDispatcher implements EventHandler { - private final RMContext rmContext; - public TestAppManagerDispatcher(RMContext rmContext) { - this.rmContext = rmContext; + public TestAppManagerDispatcher() { } @Override @@ -132,15 +119,11 @@ public void handle(RMAppManagerEvent event) { public class TestDispatcher implements EventHandler { - private final RMContext rmContext; - - public TestDispatcher(RMContext rmContext) { - this.rmContext = rmContext; + public TestDispatcher() { } @Override public void handle(RMAppEvent event) { - ApplicationId appID = event.getApplicationId(); //RMApp rmApp = this.rmContext.getRMApps().get(appID); setAppEventType(event.getType()); System.out.println("in handle routine " + getAppEventType().toString()); @@ -178,7 +161,8 @@ public int getCompletedAppsListSize() { public void setCompletedAppsMax(int max) { super.setCompletedAppsMax(max); } - public void submitApplication(ApplicationSubmissionContext submissionContext) { + public void submitApplication( + ApplicationSubmissionContext submissionContext) { super.submitApplication(submissionContext); } } @@ -336,8 +320,9 @@ public void testRMAppRetireZeroSetting() throws Exception { } protected void setupDispatcher(RMContext rmContext, Configuration conf) { - TestDispatcher testDispatcher = new TestDispatcher(rmContext); - TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext); + TestDispatcher testDispatcher = new TestDispatcher(); + TestAppManagerDispatcher testAppManagerDispatcher = + new TestAppManagerDispatcher(); rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher); rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher); ((Service)rmContext.getDispatcher()).init(conf); @@ -359,7 +344,8 @@ public void testRMAppSubmit() throws Exception { ApplicationId appID = MockApps.newAppID(1); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + ApplicationSubmissionContext context = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); context.setApplicationId(appID); setupDispatcher(rmContext, conf); @@ -367,8 +353,12 @@ public void testRMAppSubmit() throws Exception { RMApp app = rmContext.getRMApps().get(appID); Assert.assertNotNull("app is null", app); Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); - Assert.assertEquals("app name doesn't match", "N/A", app.getName()); - Assert.assertEquals("app queue doesn't match", "default", app.getQueue()); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, + app.getName()); + Assert.assertEquals("app queue doesn't match", + YarnConfiguration.DEFAULT_QUEUE_NAME, + app.getQueue()); Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); Assert.assertNotNull("app store is null", app.getApplicationStore()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 56bac77209..56b3f4b18a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -128,7 +128,7 @@ protected RMApp createNewTestApp() { RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, clientTokenStr, - appStore, rmContext.getAMLivelinessMonitor(), scheduler, + appStore, scheduler, masterService); testAppStartState(applicationId, user, name, queue, application); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java index 3214898277..989f3483d9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java @@ -27,6 +27,8 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import junit.framework.Assert; @@ -54,10 +56,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -137,15 +140,11 @@ public void test() throws IOException, InterruptedException { ApplicationSubmissionContext appSubmissionContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); appSubmissionContext.setApplicationId(appID); - appSubmissionContext.setMasterCapability(recordFactory - .newRecordInstance(Resource.class)); - appSubmissionContext.getMasterCapability().setMemory(1024); -// appSubmissionContext.resources = new HashMap(); + ContainerLaunchContext amContainer = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + amContainer.setResource(Resources.createResource(1024)); + amContainer.setCommands(Arrays.asList("sleep", "100")); appSubmissionContext.setUser("testUser"); -// appSubmissionContext.environment = new HashMap(); -// appSubmissionContext.command = new ArrayList(); - appSubmissionContext.addCommand("sleep"); - appSubmissionContext.addCommand("100"); // TODO: Use a resource to work around bugs. Today NM doesn't create local // app-dirs if there are no file to download!! @@ -162,10 +161,11 @@ public void test() throws IOException, InterruptedException { rsrc.setTimestamp(file.lastModified()); rsrc.setType(LocalResourceType.FILE); rsrc.setVisibility(LocalResourceVisibility.PRIVATE); - appSubmissionContext.setResourceTodo("testFile", rsrc); + amContainer.setLocalResources(Collections.singletonMap("testFile", rsrc)); SubmitApplicationRequest submitRequest = recordFactory .newRecordInstance(SubmitApplicationRequest.class); submitRequest.setApplicationSubmissionContext(appSubmissionContext); + appSubmissionContext.setAMContainerSpec(amContainer); resourceManager.getClientRMService().submitApplication(submitRequest); // Wait till container gets allocated for AM