MAPREDUCE-6365. Refactor JobResourceUploader#uploadFilesInternal (Chris Trezzo via sjlee)
This commit is contained in:
parent
dc065dd64c
commit
8f0d3d69d6
@ -54,7 +54,7 @@ class JobResourceUploader {
|
||||
* @param submitJobDir the submission directory of the job
|
||||
* @throws IOException
|
||||
*/
|
||||
public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
public void uploadResources(Job job, Path submitJobDir) throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
short replication =
|
||||
(short) conf.getInt(Job.SUBMIT_REPLICATION,
|
||||
@ -66,12 +66,6 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
+ "with ToolRunner to remedy this.");
|
||||
}
|
||||
|
||||
// get all the command line arguments passed in by the user conf
|
||||
String files = conf.get("tmpfiles");
|
||||
String libjars = conf.get("tmpjars");
|
||||
String archives = conf.get("tmparchives");
|
||||
String jobJar = job.getJar();
|
||||
|
||||
//
|
||||
// Figure out what fs the JobTracker is using. Copy the
|
||||
// job to it, under a temporary name. This allows DFS to work,
|
||||
@ -92,12 +86,27 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
FsPermission mapredSysPerms =
|
||||
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
|
||||
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
|
||||
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
|
||||
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
|
||||
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
|
||||
// add all the command line files/ jars and archive
|
||||
// first copy them to jobtrackers filesystem
|
||||
|
||||
uploadFiles(conf, submitJobDir, mapredSysPerms, replication);
|
||||
uploadLibJars(conf, submitJobDir, mapredSysPerms, replication);
|
||||
uploadArchives(conf, submitJobDir, mapredSysPerms, replication);
|
||||
uploadJobJar(job, submitJobDir, replication);
|
||||
addLog4jToDistributedCache(job, submitJobDir);
|
||||
|
||||
// set the timestamps of the archives and files
|
||||
// set the public/private visibility of the archives and files
|
||||
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
|
||||
// get DelegationToken for cached file
|
||||
ClientDistributedCacheManager.getDelegationTokens(conf,
|
||||
job.getCredentials());
|
||||
}
|
||||
|
||||
private void uploadFiles(Configuration conf, Path submitJobDir,
|
||||
FsPermission mapredSysPerms, short submitReplication) throws IOException {
|
||||
String files = conf.get("tmpfiles");
|
||||
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
|
||||
if (files != null) {
|
||||
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
|
||||
String[] fileArr = files.split(",");
|
||||
@ -109,7 +118,7 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
Path tmp = new Path(tmpURI);
|
||||
Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
|
||||
Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
DistributedCache.addCacheFile(pathURI, conf);
|
||||
@ -119,13 +128,19 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadLibJars(Configuration conf, Path submitJobDir,
|
||||
FsPermission mapredSysPerms, short submitReplication) throws IOException {
|
||||
String libjars = conf.get("tmpjars");
|
||||
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
|
||||
if (libjars != null) {
|
||||
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
|
||||
String[] libjarsArr = libjars.split(",");
|
||||
for (String tmpjars : libjarsArr) {
|
||||
Path tmp = new Path(tmpjars);
|
||||
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
|
||||
Path newPath =
|
||||
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
|
||||
|
||||
// Add each file to the classpath
|
||||
DistributedCache.addFileToClassPath(
|
||||
@ -140,7 +155,12 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadArchives(Configuration conf, Path submitJobDir,
|
||||
FsPermission mapredSysPerms, short submitReplication) throws IOException {
|
||||
String archives = conf.get("tmparchives");
|
||||
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
|
||||
if (archives != null) {
|
||||
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
|
||||
String[] archivesArr = archives.split(",");
|
||||
@ -152,7 +172,8 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
Path tmp = new Path(tmpURI);
|
||||
Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication);
|
||||
Path newPath =
|
||||
copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
DistributedCache.addCacheArchive(pathURI, conf);
|
||||
@ -162,7 +183,11 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadJobJar(Job job, Path submitJobDir, short submitReplication)
|
||||
throws IOException {
|
||||
String jobJar = job.getJar();
|
||||
if (jobJar != null) { // copy jar to JobTracker's fs
|
||||
// use jar name if job is not named.
|
||||
if ("".equals(job.getJobName())) {
|
||||
@ -174,22 +199,13 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException {
|
||||
// we don't need to copy it from local fs
|
||||
if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
|
||||
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
|
||||
replication);
|
||||
submitReplication);
|
||||
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
|
||||
}
|
||||
} else {
|
||||
LOG.warn("No job jar file set. User classes may not be found. "
|
||||
+ "See Job or Job#setJar(String).");
|
||||
}
|
||||
|
||||
addLog4jToDistributedCache(job, submitJobDir);
|
||||
|
||||
// set the timestamps of the archives and files
|
||||
// set the public/private visibility of the archives and files
|
||||
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
|
||||
// get DelegationToken for cached file
|
||||
ClientDistributedCacheManager.getDelegationTokens(conf,
|
||||
job.getCredentials());
|
||||
}
|
||||
|
||||
// copies a file to the jobtracker filesystem and returns the path where it
|
||||
|
@ -99,7 +99,7 @@ private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
|
||||
Job.DEFAULT_USE_WILDCARD_FOR_LIBJARS);
|
||||
JobResourceUploader rUploader = new JobResourceUploader(jtFs, useWildcards);
|
||||
|
||||
rUploader.uploadFiles(job, jobSubmitDir);
|
||||
rUploader.uploadResources(job, jobSubmitDir);
|
||||
|
||||
// Get the working directory. If not set, sets it to filesystem working dir
|
||||
// This code has been added so that working directory reset before running
|
||||
|
Loading…
Reference in New Issue
Block a user