MAPREDUCE-6824. TaskAttemptImpl#createCommonContainerLaunchContext is longer than 150 lines. Contributed by Chris Trezzo.

This commit is contained in:
Akira Ajisaka 2017-04-03 13:06:24 +09:00
parent a4b5aa8493
commit 845529b3ab

View File

@ -755,7 +755,7 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
new HashMap<String, LocalResource>(); new HashMap<String, LocalResource>();
// Application environment // Application environment
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment;
// Service data // Service data
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
@ -763,9 +763,65 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
// Tokens // Tokens
ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{}); ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
try { try {
FileSystem remoteFS = FileSystem.get(conf);
// //////////// Set up JobJar to be localized properly on the remote NM. configureJobJar(conf, localResources);
configureJobConf(conf, localResources, oldJobId);
// Setup DistributedCache
MRApps.setupDistributedCache(conf, localResources);
taskCredentialsBuffer =
configureTokens(jobToken, credentials, serviceData);
addExternalShuffleProviders(conf, serviceData);
environment = configureEnv(conf);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
// Construct the actual Container
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container =
ContainerLaunchContext.newInstance(localResources, environment, null,
serviceData, taskCredentialsBuffer, applicationACLs);
return container;
}
private static Map<String, String> configureEnv(Configuration conf)
throws IOException {
Map<String, String> environment = new HashMap<String, String>();
MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
getInitialClasspath(conf), conf);
if (initialAppClasspath != null) {
MRApps.addToEnvironment(environment, Environment.APP_CLASSPATH.name(),
initialAppClasspath, conf);
}
// Shell
environment.put(Environment.SHELL.name(), conf
.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
// Add the env variables passed by the admin
MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV),
conf);
return environment;
}
private static void configureJobJar(Configuration conf,
Map<String, LocalResource> localResources) throws IOException {
// Set up JobJar to be localized properly on the remote NM.
String jobJar = conf.get(MRJobConfig.JAR); String jobJar = conf.get(MRJobConfig.JAR);
if (jobJar != null) { if (jobJar != null) {
final Path jobJarPath = new Path(jobJar); final Path jobJarPath = new Path(jobJar);
@ -786,30 +842,31 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
LOG.info("Job jar is not present. " LOG.info("Job jar is not present. "
+ "Not adding any jar to the list of resources."); + "Not adding any jar to the list of resources.");
} }
// //////////// End of JobJar setup }
// //////////// Set up JobConf to be localized properly on the remote NM. private static void configureJobConf(Configuration conf,
Path path = Map<String, LocalResource> localResources,
MRApps.getStagingAreaDir(conf, UserGroupInformation final org.apache.hadoop.mapred.JobID oldJobId) throws IOException {
.getCurrentUser().getShortUserName()); // Set up JobConf to be localized properly on the remote NM.
Path remoteJobSubmitDir = Path path = MRApps.getStagingAreaDir(conf,
new Path(path, oldJobId.toString()); UserGroupInformation.getCurrentUser().getShortUserName());
Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
Path remoteJobConfPath = Path remoteJobConfPath =
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
localResources.put( FileSystem remoteFS = FileSystem.get(conf);
MRJobConfig.JOB_CONF_FILE, localResources.put(MRJobConfig.JOB_CONF_FILE,
createLocalResource(remoteFS, remoteJobConfPath, createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LocalResourceVisibility.APPLICATION));
LOG.info("The job-conf file on the remote FS is " LOG.info("The job-conf file on the remote FS is "
+ remoteJobConfPath.toUri().toASCIIString()); + remoteJobConfPath.toUri().toASCIIString());
// //////////// End of JobConf setup }
// Setup DistributedCache
MRApps.setupDistributedCache(conf, localResources);
private static ByteBuffer configureTokens(Token<JobTokenIdentifier> jobToken,
Credentials credentials,
Map<String, ByteBuffer> serviceData) throws IOException {
// Setup up task credentials buffer // Setup up task credentials buffer
LOG.info("Adding #" + credentials.numberOfTokens() LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ " tokens and #" + credentials.numberOfSecretKeys() + credentials.numberOfSecretKeys()
+ " secret keys for NM use for launching container"); + " secret keys for NM use for launching container");
Credentials taskCredentials = new Credentials(credentials); Credentials taskCredentials = new Credentials(credentials);
@ -818,10 +875,10 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
TokenCache.setJobToken(jobToken, taskCredentials); TokenCache.setJobToken(jobToken, taskCredentials);
DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
LOG.info("Size of containertokens_dob is " LOG.info(
+ taskCredentials.numberOfTokens()); "Size of containertokens_dob is " + taskCredentials.numberOfTokens());
taskCredentials.writeTokenStorageToStream(containerTokens_dob); taskCredentials.writeTokenStorageToStream(containerTokens_dob);
taskCredentialsBuffer = ByteBuffer taskCredentialsBuffer =
ByteBuffer.wrap(containerTokens_dob.getData(), 0, ByteBuffer.wrap(containerTokens_dob.getData(), 0,
containerTokens_dob.getLength()); containerTokens_dob.getLength());
@ -835,83 +892,47 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
+ " Using job token as shuffle secret."); + " Using job token as shuffle secret.");
shuffleSecret = jobToken.getPassword(); shuffleSecret = jobToken.getPassword();
} }
Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>( Token<JobTokenIdentifier> shuffleToken =
jobToken.getIdentifier(), shuffleSecret, jobToken.getKind(), new Token<JobTokenIdentifier>(jobToken.getIdentifier(), shuffleSecret,
jobToken.getService()); jobToken.getKind(), jobToken.getService());
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeServiceData(shuffleToken)); ShuffleHandler.serializeServiceData(shuffleToken));
return taskCredentialsBuffer;
}
private static void addExternalShuffleProviders(Configuration conf,
Map<String, ByteBuffer> serviceData) {
// add external shuffle-providers - if any // add external shuffle-providers - if any
Collection<String> shuffleProviders = conf.getStringCollection( Collection<String> shuffleProviders = conf.getStringCollection(
MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES); MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES);
if (! shuffleProviders.isEmpty()) { if (!shuffleProviders.isEmpty()) {
Collection<String> auxNames = conf.getStringCollection( Collection<String> auxNames =
YarnConfiguration.NM_AUX_SERVICES); conf.getStringCollection(YarnConfiguration.NM_AUX_SERVICES);
for (final String shuffleProvider : shuffleProviders) { for (final String shuffleProvider : shuffleProviders) {
if (shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) { if (shuffleProvider
continue; // skip built-in shuffle-provider that was already inserted with shuffle secret key .equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) {
continue; // skip built-in shuffle-provider that was already inserted
// with shuffle secret key
} }
if (auxNames.contains(shuffleProvider)) { if (auxNames.contains(shuffleProvider)) {
LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + " to serviceData"); LOG.info("Adding ShuffleProvider Service: " + shuffleProvider
+ " to serviceData");
// This only serves for INIT_APP notifications // This only serves for INIT_APP notifications
// The shuffle service needs to be able to work with the host:port information provided by the AM // The shuffle service needs to be able to work with the host:port
// (i.e. shuffle services which require custom location / other configuration are not supported) // information provided by the AM
// (i.e. shuffle services which require custom location / other
// configuration are not supported)
serviceData.put(shuffleProvider, ByteBuffer.allocate(0)); serviceData.put(shuffleProvider, ByteBuffer.allocate(0));
} } else {
else { throw new YarnRuntimeException("ShuffleProvider Service: "
throw new YarnRuntimeException("ShuffleProvider Service: " + shuffleProvider + + shuffleProvider
" was NOT found in the list of aux-services that are available in this NM." + + " was NOT found in the list of aux-services that are "
" You may need to specify this ShuffleProvider as an aux-service in your yarn-site.xml"); + "available in this NM. You may need to specify this "
+ "ShuffleProvider as an aux-service in your yarn-site.xml");
} }
} }
} }
MRApps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
getInitialClasspath(conf), conf);
if (initialAppClasspath != null) {
MRApps.addToEnvironment(
environment,
Environment.APP_CLASSPATH.name(),
initialAppClasspath, conf);
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
// Shell
environment.put(
Environment.SHELL.name(),
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_SHELL,
MRJobConfig.DEFAULT_SHELL)
);
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
MRApps.addToEnvironment(
environment,
Environment.LD_LIBRARY_PATH.name(),
MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
// Add the env variables passed by the admin
MRApps.setEnvFromInputString(
environment,
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf
);
// Construct the actual Container
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container =
ContainerLaunchContext.newInstance(localResources, environment, null,
serviceData, taskCredentialsBuffer, applicationACLs);
return container;
} }
static ContainerLaunchContext createContainerLaunchContext( static ContainerLaunchContext createContainerLaunchContext(