From 845529b3ab338e759665a687eb525fb2cccde7bf Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Mon, 3 Apr 2017 13:06:24 +0900 Subject: [PATCH] MAPREDUCE-6824. TaskAttemptImpl#createCommonContainerLaunchContext is longer than 150 lines. Contributed by Chris Trezzo. --- .../v2/app/job/impl/TaskAttemptImpl.java | 279 ++++++++++-------- 1 file changed, 150 insertions(+), 129 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 4305824070..9ea1b9aa92 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -755,7 +755,7 @@ private static ContainerLaunchContext createCommonContainerLaunchContext( new HashMap(); // Application environment - Map environment = new HashMap(); + Map environment; // Service data Map serviceData = new HashMap(); @@ -763,157 +763,178 @@ private static ContainerLaunchContext createCommonContainerLaunchContext( // Tokens ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{}); try { - FileSystem remoteFS = FileSystem.get(conf); - // //////////// Set up JobJar to be localized properly on the remote NM. - String jobJar = conf.get(MRJobConfig.JAR); - if (jobJar != null) { - final Path jobJarPath = new Path(jobJar); - final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); - Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), - jobJarFs.getWorkingDirectory()); - LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, - LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); - String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, - JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); - rc.setPattern(pattern); - localResources.put(MRJobConfig.JOB_JAR, rc); - LOG.info("The job-jar file on the remote FS is " - + remoteJobJar.toUri().toASCIIString()); - } else { - // Job jar may be null. For e.g, for pipes, the job jar is the hadoop - // mapreduce jar itself which is already on the classpath. - LOG.info("Job jar is not present. " - + "Not adding any jar to the list of resources."); - } - // //////////// End of JobJar setup + configureJobJar(conf, localResources); - // //////////// Set up JobConf to be localized properly on the remote NM. - Path path = - MRApps.getStagingAreaDir(conf, UserGroupInformation - .getCurrentUser().getShortUserName()); - Path remoteJobSubmitDir = - new Path(path, oldJobId.toString()); - Path remoteJobConfPath = - new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); - localResources.put( - MRJobConfig.JOB_CONF_FILE, - createLocalResource(remoteFS, remoteJobConfPath, - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); - LOG.info("The job-conf file on the remote FS is " - + remoteJobConfPath.toUri().toASCIIString()); - // //////////// End of JobConf setup + configureJobConf(conf, localResources, oldJobId); // Setup DistributedCache MRApps.setupDistributedCache(conf, localResources); - // Setup up task credentials buffer - LOG.info("Adding #" + credentials.numberOfTokens() - + " tokens and #" + credentials.numberOfSecretKeys() - + " secret keys for NM use for launching container"); - Credentials taskCredentials = new Credentials(credentials); - - // LocalStorageToken is needed irrespective of whether security is enabled - // or not. - TokenCache.setJobToken(jobToken, taskCredentials); - - DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); - LOG.info("Size of containertokens_dob is " - + taskCredentials.numberOfTokens()); - taskCredentials.writeTokenStorageToStream(containerTokens_dob); taskCredentialsBuffer = - ByteBuffer.wrap(containerTokens_dob.getData(), 0, - containerTokens_dob.getLength()); + configureTokens(jobToken, credentials, serviceData); - // Add shuffle secret key - // The secret key is converted to a JobToken to preserve backwards - // compatibility with an older ShuffleHandler running on an NM. - LOG.info("Putting shuffle token in serviceData"); - byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials); - if (shuffleSecret == null) { - LOG.warn("Cannot locate shuffle secret in credentials." - + " Using job token as shuffle secret."); - shuffleSecret = jobToken.getPassword(); - } - Token shuffleToken = new Token( - jobToken.getIdentifier(), shuffleSecret, jobToken.getKind(), - jobToken.getService()); - serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, - ShuffleHandler.serializeServiceData(shuffleToken)); + addExternalShuffleProviders(conf, serviceData); - // add external shuffle-providers - if any - Collection shuffleProviders = conf.getStringCollection( - MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES); - if (! shuffleProviders.isEmpty()) { - Collection auxNames = conf.getStringCollection( - YarnConfiguration.NM_AUX_SERVICES); + environment = configureEnv(conf); - for (final String shuffleProvider : shuffleProviders) { - if (shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) { - continue; // skip built-in shuffle-provider that was already inserted with shuffle secret key - } - if (auxNames.contains(shuffleProvider)) { - LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + " to serviceData"); - // This only serves for INIT_APP notifications - // The shuffle service needs to be able to work with the host:port information provided by the AM - // (i.e. shuffle services which require custom location / other configuration are not supported) - serviceData.put(shuffleProvider, ByteBuffer.allocate(0)); - } - else { - throw new YarnRuntimeException("ShuffleProvider Service: " + shuffleProvider + - " was NOT found in the list of aux-services that are available in this NM." + - " You may need to specify this ShuffleProvider as an aux-service in your yarn-site.xml"); - } - } - } - - MRApps.addToEnvironment( - environment, - Environment.CLASSPATH.name(), - getInitialClasspath(conf), conf); - - if (initialAppClasspath != null) { - MRApps.addToEnvironment( - environment, - Environment.APP_CLASSPATH.name(), - initialAppClasspath, conf); - } } catch (IOException e) { throw new YarnRuntimeException(e); } - // Shell - environment.put( - Environment.SHELL.name(), - conf.get( - MRJobConfig.MAPRED_ADMIN_USER_SHELL, - MRJobConfig.DEFAULT_SHELL) - ); - - // Add pwd to LD_LIBRARY_PATH, add this before adding anything else - MRApps.addToEnvironment( - environment, - Environment.LD_LIBRARY_PATH.name(), - MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf); - - // Add the env variables passed by the admin - MRApps.setEnvFromInputString( - environment, - conf.get( - MRJobConfig.MAPRED_ADMIN_USER_ENV, - MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf - ); - // Construct the actual Container // The null fields are per-container and will be constructed for each // container separately. ContainerLaunchContext container = ContainerLaunchContext.newInstance(localResources, environment, null, - serviceData, taskCredentialsBuffer, applicationACLs); + serviceData, taskCredentialsBuffer, applicationACLs); return container; } + private static Map configureEnv(Configuration conf) + throws IOException { + Map environment = new HashMap(); + MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), + getInitialClasspath(conf), conf); + + if (initialAppClasspath != null) { + MRApps.addToEnvironment(environment, Environment.APP_CLASSPATH.name(), + initialAppClasspath, conf); + } + + // Shell + environment.put(Environment.SHELL.name(), conf + .get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL)); + + // Add pwd to LD_LIBRARY_PATH, add this before adding anything else + MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(), + MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf); + + // Add the env variables passed by the admin + MRApps.setEnvFromInputString(environment, + conf.get(MRJobConfig.MAPRED_ADMIN_USER_ENV, + MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), + conf); + return environment; + } + + private static void configureJobJar(Configuration conf, + Map localResources) throws IOException { + // Set up JobJar to be localized properly on the remote NM. + String jobJar = conf.get(MRJobConfig.JAR); + if (jobJar != null) { + final Path jobJarPath = new Path(jobJar); + final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); + Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), + jobJarFs.getWorkingDirectory()); + LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, + LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); + String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, + JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); + rc.setPattern(pattern); + localResources.put(MRJobConfig.JOB_JAR, rc); + LOG.info("The job-jar file on the remote FS is " + + remoteJobJar.toUri().toASCIIString()); + } else { + // Job jar may be null. For e.g, for pipes, the job jar is the hadoop + // mapreduce jar itself which is already on the classpath. + LOG.info("Job jar is not present. " + + "Not adding any jar to the list of resources."); + } + } + + private static void configureJobConf(Configuration conf, + Map localResources, + final org.apache.hadoop.mapred.JobID oldJobId) throws IOException { + // Set up JobConf to be localized properly on the remote NM. + Path path = MRApps.getStagingAreaDir(conf, + UserGroupInformation.getCurrentUser().getShortUserName()); + Path remoteJobSubmitDir = new Path(path, oldJobId.toString()); + Path remoteJobConfPath = + new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); + FileSystem remoteFS = FileSystem.get(conf); + localResources.put(MRJobConfig.JOB_CONF_FILE, + createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION)); + LOG.info("The job-conf file on the remote FS is " + + remoteJobConfPath.toUri().toASCIIString()); + } + + private static ByteBuffer configureTokens(Token jobToken, + Credentials credentials, + Map serviceData) throws IOException { + // Setup up task credentials buffer + LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #" + + credentials.numberOfSecretKeys() + + " secret keys for NM use for launching container"); + Credentials taskCredentials = new Credentials(credentials); + + // LocalStorageToken is needed irrespective of whether security is enabled + // or not. + TokenCache.setJobToken(jobToken, taskCredentials); + + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + LOG.info( + "Size of containertokens_dob is " + taskCredentials.numberOfTokens()); + taskCredentials.writeTokenStorageToStream(containerTokens_dob); + ByteBuffer taskCredentialsBuffer = + ByteBuffer.wrap(containerTokens_dob.getData(), 0, + containerTokens_dob.getLength()); + + // Add shuffle secret key + // The secret key is converted to a JobToken to preserve backwards + // compatibility with an older ShuffleHandler running on an NM. + LOG.info("Putting shuffle token in serviceData"); + byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials); + if (shuffleSecret == null) { + LOG.warn("Cannot locate shuffle secret in credentials." + + " Using job token as shuffle secret."); + shuffleSecret = jobToken.getPassword(); + } + Token shuffleToken = + new Token(jobToken.getIdentifier(), shuffleSecret, + jobToken.getKind(), jobToken.getService()); + serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeServiceData(shuffleToken)); + return taskCredentialsBuffer; + } + + private static void addExternalShuffleProviders(Configuration conf, + Map serviceData) { + // add external shuffle-providers - if any + Collection shuffleProviders = conf.getStringCollection( + MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES); + if (!shuffleProviders.isEmpty()) { + Collection auxNames = + conf.getStringCollection(YarnConfiguration.NM_AUX_SERVICES); + + for (final String shuffleProvider : shuffleProviders) { + if (shuffleProvider + .equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) { + continue; // skip built-in shuffle-provider that was already inserted + // with shuffle secret key + } + if (auxNames.contains(shuffleProvider)) { + LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + + " to serviceData"); + // This only serves for INIT_APP notifications + // The shuffle service needs to be able to work with the host:port + // information provided by the AM + // (i.e. shuffle services which require custom location / other + // configuration are not supported) + serviceData.put(shuffleProvider, ByteBuffer.allocate(0)); + } else { + throw new YarnRuntimeException("ShuffleProvider Service: " + + shuffleProvider + + " was NOT found in the list of aux-services that are " + + "available in this NM. You may need to specify this " + + "ShuffleProvider as an aux-service in your yarn-site.xml"); + } + } + } + } + static ContainerLaunchContext createContainerLaunchContext( Map applicationACLs, Configuration conf, Token jobToken, Task remoteTask,