MAPREDUCE-7371. DistributedCache alternative APIs should not use DistributedCache APIs internally (#3855)
This commit is contained in:
parent
f64fda0f00
commit
08c803ea30
@ -45,7 +45,6 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -95,13 +94,13 @@ public synchronized void setup(JobConf conf, JobID jobId) throws IOException {
|
|||||||
|
|
||||||
// Find which resources are to be put on the local classpath
|
// Find which resources are to be put on the local classpath
|
||||||
Map<String, Path> classpaths = new HashMap<String, Path>();
|
Map<String, Path> classpaths = new HashMap<String, Path>();
|
||||||
Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
|
Path[] archiveClassPaths = JobContextImpl.getArchiveClassPaths(conf);
|
||||||
if (archiveClassPaths != null) {
|
if (archiveClassPaths != null) {
|
||||||
for (Path p : archiveClassPaths) {
|
for (Path p : archiveClassPaths) {
|
||||||
classpaths.put(p.toUri().getPath().toString(), p);
|
classpaths.put(p.toUri().getPath().toString(), p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
|
Path[] fileClassPaths = JobContextImpl.getFileClassPaths(conf);
|
||||||
if (fileClassPaths != null) {
|
if (fileClassPaths != null) {
|
||||||
for (Path p : fileClassPaths) {
|
for (Path p : fileClassPaths) {
|
||||||
classpaths.put(p.toUri().getPath().toString(), p);
|
classpaths.put(p.toUri().getPath().toString(), p);
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||||
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
@ -272,12 +273,12 @@ public static void setClasspath(Map<String, String> environment,
|
|||||||
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
|
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
|
||||||
// a * in the classpath will only find a .jar, so we need to filter out
|
// a * in the classpath will only find a .jar, so we need to filter out
|
||||||
// all .jars and add everything else
|
// all .jars and add everything else
|
||||||
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
|
addToClasspathIfNotJar(JobContextImpl.getFileClassPaths(conf),
|
||||||
DistributedCache.getCacheFiles(conf),
|
JobContextImpl.getCacheFiles(conf),
|
||||||
conf,
|
conf,
|
||||||
environment, classpathEnvVar);
|
environment, classpathEnvVar);
|
||||||
addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
|
addToClasspathIfNotJar(JobContextImpl.getArchiveClassPaths(conf),
|
||||||
DistributedCache.getCacheArchives(conf),
|
JobContextImpl.getCacheArchives(conf),
|
||||||
conf,
|
conf,
|
||||||
environment, classpathEnvVar);
|
environment, classpathEnvVar);
|
||||||
if (userClassesTakesPrecedence) {
|
if (userClassesTakesPrecedence) {
|
||||||
@ -483,8 +484,8 @@ public static void setupDistributedCache(Configuration conf,
|
|||||||
|
|
||||||
// Cache archives
|
// Cache archives
|
||||||
lrb.setType(LocalResourceType.ARCHIVE);
|
lrb.setType(LocalResourceType.ARCHIVE);
|
||||||
lrb.setUris(DistributedCache.getCacheArchives(conf));
|
lrb.setUris(JobContextImpl.getCacheArchives(conf));
|
||||||
lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf));
|
lrb.setTimestamps(JobContextImpl.getArchiveTimestamps(conf));
|
||||||
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
|
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
|
||||||
lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
|
lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
|
||||||
lrb.setSharedCacheUploadPolicies(
|
lrb.setSharedCacheUploadPolicies(
|
||||||
@ -493,8 +494,8 @@ public static void setupDistributedCache(Configuration conf,
|
|||||||
|
|
||||||
// Cache files
|
// Cache files
|
||||||
lrb.setType(LocalResourceType.FILE);
|
lrb.setType(LocalResourceType.FILE);
|
||||||
lrb.setUris(DistributedCache.getCacheFiles(conf));
|
lrb.setUris(JobContextImpl.getCacheFiles(conf));
|
||||||
lrb.setTimestamps(DistributedCache.getFileTimestamps(conf));
|
lrb.setTimestamps(JobContextImpl.getFileTimestamps(conf));
|
||||||
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
|
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
|
||||||
lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
|
lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
|
||||||
lrb.setSharedCacheUploadPolicies(
|
lrb.setSharedCacheUploadPolicies(
|
||||||
@ -504,9 +505,9 @@ public static void setupDistributedCache(Configuration conf,
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up the DistributedCache related configs to make
|
* Set up the DistributedCache related configs to make
|
||||||
* {@link DistributedCache#getLocalCacheFiles(Configuration)}
|
* {@link JobContextImpl#getLocalCacheFiles(Configuration)}
|
||||||
* and
|
* and
|
||||||
* {@link DistributedCache#getLocalCacheArchives(Configuration)}
|
* {@link JobContextImpl#getLocalCacheArchives(Configuration)}
|
||||||
* working.
|
* working.
|
||||||
* @param conf
|
* @param conf
|
||||||
* @throws java.io.IOException
|
* @throws java.io.IOException
|
||||||
@ -518,7 +519,7 @@ public static void setupDistributedCacheLocal(Configuration conf)
|
|||||||
// ^ ^ all symlinks are created in the current work-dir
|
// ^ ^ all symlinks are created in the current work-dir
|
||||||
|
|
||||||
// Update the configuration object with localized archives.
|
// Update the configuration object with localized archives.
|
||||||
URI[] cacheArchives = DistributedCache.getCacheArchives(conf);
|
URI[] cacheArchives = JobContextImpl.getCacheArchives(conf);
|
||||||
if (cacheArchives != null) {
|
if (cacheArchives != null) {
|
||||||
List<String> localArchives = new ArrayList<String>();
|
List<String> localArchives = new ArrayList<String>();
|
||||||
for (int i = 0; i < cacheArchives.length; ++i) {
|
for (int i = 0; i < cacheArchives.length; ++i) {
|
||||||
@ -538,7 +539,7 @@ public static void setupDistributedCacheLocal(Configuration conf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update the configuration object with localized files.
|
// Update the configuration object with localized files.
|
||||||
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
|
URI[] cacheFiles = JobContextImpl.getCacheFiles(conf);
|
||||||
if (cacheFiles != null) {
|
if (cacheFiles != null) {
|
||||||
List<String> localFiles = new ArrayList<String>();
|
List<String> localFiles = new ArrayList<String>();
|
||||||
for (int i = 0; i < cacheFiles.length; ++i) {
|
for (int i = 0; i < cacheFiles.length; ++i) {
|
||||||
|
@ -51,7 +51,6 @@
|
|||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -174,7 +173,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
DistributedCache.addCacheFile(file, conf);
|
Job.addCacheFile(file, conf);
|
||||||
Map<String, Boolean> policies = new HashMap<String, Boolean>();
|
Map<String, Boolean> policies = new HashMap<String, Boolean>();
|
||||||
policies.put(file.toString(), true);
|
policies.put(file.toString(), true);
|
||||||
Job.setFileSharedCacheUploadPolicies(conf, policies);
|
Job.setFileSharedCacheUploadPolicies(conf, policies);
|
||||||
@ -286,8 +285,8 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
DistributedCache.addCacheFile(file, conf);
|
Job.addCacheFile(file, conf);
|
||||||
DistributedCache.addCacheFile(file, conf);
|
Job.addCacheFile(file, conf);
|
||||||
Map<String, Boolean> policies = new HashMap<String, Boolean>();
|
Map<String, Boolean> policies = new HashMap<String, Boolean>();
|
||||||
policies.put(file.toString(), true);
|
policies.put(file.toString(), true);
|
||||||
Job.setFileSharedCacheUploadPolicies(conf, policies);
|
Job.setFileSharedCacheUploadPolicies(conf, policies);
|
||||||
|
@ -229,60 +229,60 @@ public void testDeprecatedFunctions() throws Exception {
|
|||||||
Assert.assertEquals("Test Local Archives 1",
|
Assert.assertEquals("Test Local Archives 1",
|
||||||
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
|
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getLocalCacheArchives(conf).length);
|
JobContextImpl.getLocalCacheArchives(conf).length);
|
||||||
Assert.assertEquals("Test Local Archives 1",
|
Assert.assertEquals("Test Local Archives 1",
|
||||||
DistributedCache.getLocalCacheArchives(conf)[0].getName());
|
JobContextImpl.getLocalCacheArchives(conf)[0].getName());
|
||||||
DistributedCache.addLocalArchives(conf, "Test Local Archives 2");
|
DistributedCache.addLocalArchives(conf, "Test Local Archives 2");
|
||||||
Assert.assertEquals("Test Local Archives 1,Test Local Archives 2",
|
Assert.assertEquals("Test Local Archives 1,Test Local Archives 2",
|
||||||
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
|
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
DistributedCache.getLocalCacheArchives(conf).length);
|
JobContextImpl.getLocalCacheArchives(conf).length);
|
||||||
Assert.assertEquals("Test Local Archives 2",
|
Assert.assertEquals("Test Local Archives 2",
|
||||||
DistributedCache.getLocalCacheArchives(conf)[1].getName());
|
JobContextImpl.getLocalCacheArchives(conf)[1].getName());
|
||||||
DistributedCache.setLocalArchives(conf, "Test Local Archives 3");
|
DistributedCache.setLocalArchives(conf, "Test Local Archives 3");
|
||||||
Assert.assertEquals("Test Local Archives 3",
|
Assert.assertEquals("Test Local Archives 3",
|
||||||
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
|
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getLocalCacheArchives(conf).length);
|
JobContextImpl.getLocalCacheArchives(conf).length);
|
||||||
Assert.assertEquals("Test Local Archives 3",
|
Assert.assertEquals("Test Local Archives 3",
|
||||||
DistributedCache.getLocalCacheArchives(conf)[0].getName());
|
JobContextImpl.getLocalCacheArchives(conf)[0].getName());
|
||||||
|
|
||||||
DistributedCache.addLocalFiles(conf, "Test Local Files 1");
|
DistributedCache.addLocalFiles(conf, "Test Local Files 1");
|
||||||
Assert.assertEquals("Test Local Files 1",
|
Assert.assertEquals("Test Local Files 1",
|
||||||
conf.get(DistributedCache.CACHE_LOCALFILES));
|
conf.get(DistributedCache.CACHE_LOCALFILES));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getLocalCacheFiles(conf).length);
|
JobContextImpl.getLocalCacheFiles(conf).length);
|
||||||
Assert.assertEquals("Test Local Files 1",
|
Assert.assertEquals("Test Local Files 1",
|
||||||
DistributedCache.getLocalCacheFiles(conf)[0].getName());
|
JobContextImpl.getLocalCacheFiles(conf)[0].getName());
|
||||||
DistributedCache.addLocalFiles(conf, "Test Local Files 2");
|
DistributedCache.addLocalFiles(conf, "Test Local Files 2");
|
||||||
Assert.assertEquals("Test Local Files 1,Test Local Files 2",
|
Assert.assertEquals("Test Local Files 1,Test Local Files 2",
|
||||||
conf.get(DistributedCache.CACHE_LOCALFILES));
|
conf.get(DistributedCache.CACHE_LOCALFILES));
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
DistributedCache.getLocalCacheFiles(conf).length);
|
JobContextImpl.getLocalCacheFiles(conf).length);
|
||||||
Assert.assertEquals("Test Local Files 2",
|
Assert.assertEquals("Test Local Files 2",
|
||||||
DistributedCache.getLocalCacheFiles(conf)[1].getName());
|
JobContextImpl.getLocalCacheFiles(conf)[1].getName());
|
||||||
DistributedCache.setLocalFiles(conf, "Test Local Files 3");
|
DistributedCache.setLocalFiles(conf, "Test Local Files 3");
|
||||||
Assert.assertEquals("Test Local Files 3",
|
Assert.assertEquals("Test Local Files 3",
|
||||||
conf.get(DistributedCache.CACHE_LOCALFILES));
|
conf.get(DistributedCache.CACHE_LOCALFILES));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getLocalCacheFiles(conf).length);
|
JobContextImpl.getLocalCacheFiles(conf).length);
|
||||||
Assert.assertEquals("Test Local Files 3",
|
Assert.assertEquals("Test Local Files 3",
|
||||||
DistributedCache.getLocalCacheFiles(conf)[0].getName());
|
JobContextImpl.getLocalCacheFiles(conf)[0].getName());
|
||||||
|
|
||||||
DistributedCache.setArchiveTimestamps(conf, "1234567890");
|
DistributedCache.setArchiveTimestamps(conf, "1234567890");
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
conf.getLong(DistributedCache.CACHE_ARCHIVES_TIMESTAMPS, 0));
|
conf.getLong(DistributedCache.CACHE_ARCHIVES_TIMESTAMPS, 0));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getArchiveTimestamps(conf).length);
|
JobContextImpl.getArchiveTimestamps(conf).length);
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
DistributedCache.getArchiveTimestamps(conf)[0]);
|
JobContextImpl.getArchiveTimestamps(conf)[0]);
|
||||||
DistributedCache.setFileTimestamps(conf, "1234567890");
|
DistributedCache.setFileTimestamps(conf, "1234567890");
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0));
|
conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getFileTimestamps(conf).length);
|
JobContextImpl.getFileTimestamps(conf).length);
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
DistributedCache.getFileTimestamps(conf)[0]);
|
JobContextImpl.getFileTimestamps(conf)[0]);
|
||||||
|
|
||||||
DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"),
|
DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"),
|
||||||
new File("Test Work Dir"));
|
new File("Test Work Dir"));
|
||||||
@ -297,18 +297,18 @@ public void testDeprecatedFunctions() throws Exception {
|
|||||||
DistributedCache.getTimestamp(conf, symlinkFile.toURI()));
|
DistributedCache.getTimestamp(conf, symlinkFile.toURI()));
|
||||||
Assert.assertTrue(symlinkFile.delete());
|
Assert.assertTrue(symlinkFile.delete());
|
||||||
|
|
||||||
DistributedCache.addCacheArchive(symlinkFile.toURI(), conf);
|
Job.addCacheArchive(symlinkFile.toURI(), conf);
|
||||||
Assert.assertEquals(symlinkFile.toURI().toString(),
|
Assert.assertEquals(symlinkFile.toURI().toString(),
|
||||||
conf.get(DistributedCache.CACHE_ARCHIVES));
|
conf.get(DistributedCache.CACHE_ARCHIVES));
|
||||||
Assert.assertEquals(1, DistributedCache.getCacheArchives(conf).length);
|
Assert.assertEquals(1, JobContextImpl.getCacheArchives(conf).length);
|
||||||
Assert.assertEquals(symlinkFile.toURI(),
|
Assert.assertEquals(symlinkFile.toURI(),
|
||||||
DistributedCache.getCacheArchives(conf)[0]);
|
JobContextImpl.getCacheArchives(conf)[0]);
|
||||||
|
|
||||||
DistributedCache.addCacheFile(symlinkFile.toURI(), conf);
|
Job.addCacheFile(symlinkFile.toURI(), conf);
|
||||||
Assert.assertEquals(symlinkFile.toURI().toString(),
|
Assert.assertEquals(symlinkFile.toURI().toString(),
|
||||||
conf.get(DistributedCache.CACHE_FILES));
|
conf.get(DistributedCache.CACHE_FILES));
|
||||||
Assert.assertEquals(1, DistributedCache.getCacheFiles(conf).length);
|
Assert.assertEquals(1, JobContextImpl.getCacheFiles(conf).length);
|
||||||
Assert.assertEquals(symlinkFile.toURI(),
|
Assert.assertEquals(symlinkFile.toURI(),
|
||||||
DistributedCache.getCacheFiles(conf)[0]);
|
JobContextImpl.getCacheFiles(conf)[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,6 @@
|
|||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
@ -377,11 +376,11 @@ public void testSetupDistributedCacheConflicts() throws Exception {
|
|||||||
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
||||||
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
||||||
|
|
||||||
DistributedCache.addCacheArchive(archive, conf);
|
Job.addCacheArchive(archive, conf);
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
||||||
DistributedCache.addCacheFile(file, conf);
|
Job.addCacheFile(file, conf);
|
||||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
||||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
||||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
||||||
@ -416,8 +415,8 @@ public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
|||||||
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
||||||
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
|
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
|
||||||
|
|
||||||
DistributedCache.addCacheFile(file, conf);
|
Job.addCacheFile(file, conf);
|
||||||
DistributedCache.addCacheFile(file2, conf);
|
Job.addCacheFile(file2, conf);
|
||||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
|
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
|
||||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
|
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
|
||||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
|
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
|
||||||
@ -452,11 +451,11 @@ public void testSetupDistributedCache() throws Exception {
|
|||||||
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
||||||
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
||||||
|
|
||||||
DistributedCache.addCacheArchive(archive, conf);
|
Job.addCacheArchive(archive, conf);
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
||||||
DistributedCache.addCacheFile(file, conf);
|
Job.addCacheFile(file, conf);
|
||||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
||||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
||||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
||||||
|
@ -48,11 +48,11 @@
|
|||||||
import org.apache.hadoop.mapred.TaskAttemptID;
|
import org.apache.hadoop.mapred.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapred.TaskLog;
|
import org.apache.hadoop.mapred.TaskLog;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
||||||
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
||||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||||
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
@ -117,7 +117,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
|||||||
if (interpretor != null) {
|
if (interpretor != null) {
|
||||||
cmd.add(interpretor);
|
cmd.add(interpretor);
|
||||||
}
|
}
|
||||||
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
|
String executable = JobContextImpl.getLocalCacheFiles(conf)[0].toString();
|
||||||
if (!FileUtil.canExecute(new File(executable))) {
|
if (!FileUtil.canExecute(new File(executable))) {
|
||||||
// LinuxTaskController sets +x permissions on all distcache files already.
|
// LinuxTaskController sets +x permissions on all distcache files already.
|
||||||
// In case of DefaultTaskController, set permissions here.
|
// In case of DefaultTaskController, set permissions here.
|
||||||
|
@ -54,8 +54,9 @@
|
|||||||
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
||||||
import org.apache.hadoop.mapred.lib.LazyOutputFormat;
|
import org.apache.hadoop.mapred.lib.LazyOutputFormat;
|
||||||
import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
@ -319,7 +320,7 @@ private static void setupPipesJob(JobConf conf) throws IOException {
|
|||||||
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
|
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
|
||||||
setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
|
setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
|
||||||
}
|
}
|
||||||
URI[] fileCache = DistributedCache.getCacheFiles(conf);
|
URI[] fileCache = JobContextImpl.getCacheFiles(conf);
|
||||||
if (fileCache == null) {
|
if (fileCache == null) {
|
||||||
fileCache = new URI[1];
|
fileCache = new URI[1];
|
||||||
} else {
|
} else {
|
||||||
@ -334,7 +335,7 @@ private static void setupPipesJob(JobConf conf) throws IOException {
|
|||||||
ie.initCause(e);
|
ie.initCause(e);
|
||||||
throw ie;
|
throw ie;
|
||||||
}
|
}
|
||||||
DistributedCache.setCacheFiles(fileCache, conf);
|
Job.setCacheFiles(fileCache, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1121,7 +1121,18 @@ public void setJobSetupCleanupNeeded(boolean needed) {
|
|||||||
*/
|
*/
|
||||||
public void setCacheArchives(URI[] archives) {
|
public void setCacheArchives(URI[] archives) {
|
||||||
ensureState(JobState.DEFINE);
|
ensureState(JobState.DEFINE);
|
||||||
DistributedCache.setCacheArchives(archives, conf);
|
setCacheArchives(archives, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the configuration with the given set of archives.
|
||||||
|
*
|
||||||
|
* @param archives The list of archives that need to be localized.
|
||||||
|
* @param conf Configuration which will be changed.
|
||||||
|
*/
|
||||||
|
public static void setCacheArchives(URI[] archives, Configuration conf) {
|
||||||
|
String cacheArchives = StringUtils.uriToString(archives);
|
||||||
|
conf.set(MRJobConfig.CACHE_ARCHIVES, cacheArchives);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1130,7 +1141,18 @@ public void setCacheArchives(URI[] archives) {
|
|||||||
*/
|
*/
|
||||||
public void setCacheFiles(URI[] files) {
|
public void setCacheFiles(URI[] files) {
|
||||||
ensureState(JobState.DEFINE);
|
ensureState(JobState.DEFINE);
|
||||||
DistributedCache.setCacheFiles(files, conf);
|
setCacheFiles(files, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the configuration with the given set of files.
|
||||||
|
*
|
||||||
|
* @param files The list of files that need to be localized.
|
||||||
|
* @param conf Configuration which will be changed.
|
||||||
|
*/
|
||||||
|
public static void setCacheFiles(URI[] files, Configuration conf) {
|
||||||
|
String cacheFiles = StringUtils.uriToString(files);
|
||||||
|
conf.set(MRJobConfig.CACHE_FILES, cacheFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1139,16 +1161,53 @@ public void setCacheFiles(URI[] files) {
|
|||||||
*/
|
*/
|
||||||
public void addCacheArchive(URI uri) {
|
public void addCacheArchive(URI uri) {
|
||||||
ensureState(JobState.DEFINE);
|
ensureState(JobState.DEFINE);
|
||||||
DistributedCache.addCacheArchive(uri, conf);
|
addCacheArchive(uri, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an archives to be localized to the conf.
|
||||||
|
*
|
||||||
|
* @param uri The uri of the cache to be localized.
|
||||||
|
* @param conf Configuration to add the cache to.
|
||||||
|
*/
|
||||||
|
public static void addCacheArchive(URI uri, Configuration conf) {
|
||||||
|
String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
|
||||||
|
conf.set(MRJobConfig.CACHE_ARCHIVES,
|
||||||
|
archives == null ? uri.toString() : archives + "," + uri.toString());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a file to be localized
|
* Add a file to be localized
|
||||||
* @param uri The uri of the cache to be localized
|
* @param uri The uri of the cache to be localized
|
||||||
*/
|
*/
|
||||||
public void addCacheFile(URI uri) {
|
public void addCacheFile(URI uri) {
|
||||||
ensureState(JobState.DEFINE);
|
ensureState(JobState.DEFINE);
|
||||||
DistributedCache.addCacheFile(uri, conf);
|
addCacheFile(uri, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a file to be localized to the conf. The localized file will be
|
||||||
|
* downloaded to the execution node(s), and a link will be created to the
|
||||||
|
* file from the job's working directory. If the last part of URI's path name
|
||||||
|
* is "*", then the entire parent directory will be localized and links
|
||||||
|
* will be created from the job's working directory to each file in the
|
||||||
|
* parent directory.
|
||||||
|
* <p>
|
||||||
|
* The access permissions of the file will determine whether the localized
|
||||||
|
* file will be shared across jobs. If the file is not readable by other or
|
||||||
|
* if any of its parent directories is not executable by other, then the
|
||||||
|
* file will not be shared. In the case of a path that ends in "/*",
|
||||||
|
* sharing of the localized files will be determined solely from the
|
||||||
|
* access permissions of the parent directories. The access permissions of
|
||||||
|
* the individual files will be ignored.
|
||||||
|
*
|
||||||
|
* @param uri The uri of the cache to be localized.
|
||||||
|
* @param conf Configuration to add the cache to.
|
||||||
|
*/
|
||||||
|
public static void addCacheFile(URI uri, Configuration conf) {
|
||||||
|
String files = conf.get(MRJobConfig.CACHE_FILES);
|
||||||
|
conf.set(MRJobConfig.CACHE_FILES,
|
||||||
|
files == null ? uri.toString() : files + "," + uri.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1165,7 +1224,39 @@ public void addCacheFile(URI uri) {
|
|||||||
public void addFileToClassPath(Path file)
|
public void addFileToClassPath(Path file)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ensureState(JobState.DEFINE);
|
ensureState(JobState.DEFINE);
|
||||||
DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
|
addFileToClassPath(file, conf, file.getFileSystem(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a file path to the current set of classpath entries. The file will
|
||||||
|
* also be added to the cache.
|
||||||
|
*
|
||||||
|
* @param file Path of the file to be added.
|
||||||
|
* @param conf Configuration that contains the classpath setting.
|
||||||
|
* @param fs FileSystem with respect to which {@code file} should be interpreted.
|
||||||
|
*/
|
||||||
|
public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) {
|
||||||
|
addFileToClassPath(file, conf, fs, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a file path to the current set of classpath entries. The file will
|
||||||
|
* also be added to the cache if {@code addToCache} is true.
|
||||||
|
*
|
||||||
|
* @param file Path of the file to be added.
|
||||||
|
* @param conf Configuration that contains the classpath setting.
|
||||||
|
* @param fs FileSystem with respect to which {@code file} should be interpreted.
|
||||||
|
* @param addToCache Whether the file should also be added to the cache list.
|
||||||
|
*/
|
||||||
|
public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs,
|
||||||
|
boolean addToCache) {
|
||||||
|
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
|
||||||
|
conf.set(MRJobConfig.CLASSPATH_FILES,
|
||||||
|
classpath == null ? file.toString() : classpath + "," + file.toString());
|
||||||
|
if (addToCache) {
|
||||||
|
URI uri = fs.makeQualified(file).toUri();
|
||||||
|
Job.addCacheFile(uri, conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1180,7 +1271,23 @@ public void addFileToClassPath(Path file)
|
|||||||
public void addArchiveToClassPath(Path archive)
|
public void addArchiveToClassPath(Path archive)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ensureState(JobState.DEFINE);
|
ensureState(JobState.DEFINE);
|
||||||
DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
|
addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an archive path to the current set of classpath entries. It adds the
|
||||||
|
* archive to cache as well.
|
||||||
|
*
|
||||||
|
* @param archive Path of the archive to be added.
|
||||||
|
* @param conf Configuration that contains the classpath setting.
|
||||||
|
* @param fs FileSystem with respect to which {@code archive} should be interpreted.
|
||||||
|
*/
|
||||||
|
public static void addArchiveToClassPath(Path archive, Configuration conf, FileSystem fs) {
|
||||||
|
String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES);
|
||||||
|
conf.set(MRJobConfig.CLASSPATH_ARCHIVES,
|
||||||
|
classpath == null ? archive.toString() : classpath + "," + archive.toString());
|
||||||
|
URI uri = fs.makeQualified(archive).toUri();
|
||||||
|
Job.addCacheArchive(uri, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -332,13 +332,12 @@ void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
|
|||||||
// separately.
|
// separately.
|
||||||
foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
|
foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
|
||||||
}
|
}
|
||||||
DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf,
|
Job.addFileToClassPath(new Path(newURI.getPath()), conf, jtFs, false);
|
||||||
jtFs, false);
|
|
||||||
if (fromSharedCache) {
|
if (fromSharedCache) {
|
||||||
// We simply add this URI to the distributed cache. It will not come
|
// We simply add this URI to the distributed cache. It will not come
|
||||||
// from the staging directory (it is in the shared cache), so we
|
// from the staging directory (it is in the shared cache), so we
|
||||||
// must add it to the cache regardless of the wildcard feature.
|
// must add it to the cache regardless of the wildcard feature.
|
||||||
DistributedCache.addCacheFile(newURI, conf);
|
Job.addCacheFile(newURI, conf);
|
||||||
} else {
|
} else {
|
||||||
libjarURIs.add(newURI);
|
libjarURIs.add(newURI);
|
||||||
}
|
}
|
||||||
@ -352,10 +351,10 @@ void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
|
|||||||
// Add the whole directory to the cache using a wild card
|
// Add the whole directory to the cache using a wild card
|
||||||
Path libJarsDirWildcard =
|
Path libJarsDirWildcard =
|
||||||
jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
|
jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
|
||||||
DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
|
Job.addCacheFile(libJarsDirWildcard.toUri(), conf);
|
||||||
} else {
|
} else {
|
||||||
for (URI uri : libjarURIs) {
|
for (URI uri : libjarURIs) {
|
||||||
DistributedCache.addCacheFile(uri, conf);
|
Job.addCacheFile(uri, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -847,8 +846,8 @@ private void copyLog4jPropertyFile(Job job, Path submitJobDir,
|
|||||||
}
|
}
|
||||||
Path tmp = new Path(tmpURI);
|
Path tmp = new Path(tmpURI);
|
||||||
Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
|
Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
|
||||||
DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()),
|
Path path = new Path(newPath.toUri().getPath());
|
||||||
conf);
|
Job.addFileToClassPath(path, conf, path.getFileSystem(conf));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,7 +466,7 @@ private static void addMRFrameworkToDistributedCache(Configuration conf)
|
|||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
DistributedCache.addCacheArchive(uri, conf);
|
Job.addCacheArchive(uri, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -82,7 +83,7 @@ public static void determineTimestampsAndCacheVisibilities(Configuration job,
|
|||||||
*/
|
*/
|
||||||
public static void determineTimestamps(Configuration job,
|
public static void determineTimestamps(Configuration job,
|
||||||
Map<URI, FileStatus> statCache) throws IOException {
|
Map<URI, FileStatus> statCache) throws IOException {
|
||||||
URI[] tarchives = DistributedCache.getCacheArchives(job);
|
URI[] tarchives = JobContextImpl.getCacheArchives(job);
|
||||||
if (tarchives != null) {
|
if (tarchives != null) {
|
||||||
FileStatus status = getFileStatus(job, tarchives[0], statCache);
|
FileStatus status = getFileStatus(job, tarchives[0], statCache);
|
||||||
StringBuilder archiveFileSizes =
|
StringBuilder archiveFileSizes =
|
||||||
@ -100,7 +101,7 @@ public static void determineTimestamps(Configuration job,
|
|||||||
setArchiveTimestamps(job, archiveTimestamps.toString());
|
setArchiveTimestamps(job, archiveTimestamps.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
URI[] tfiles = DistributedCache.getCacheFiles(job);
|
URI[] tfiles = JobContextImpl.getCacheFiles(job);
|
||||||
if (tfiles != null) {
|
if (tfiles != null) {
|
||||||
FileStatus status = getFileStatus(job, tfiles[0], statCache);
|
FileStatus status = getFileStatus(job, tfiles[0], statCache);
|
||||||
StringBuilder fileSizes =
|
StringBuilder fileSizes =
|
||||||
@ -127,8 +128,8 @@ public static void determineTimestamps(Configuration job,
|
|||||||
*/
|
*/
|
||||||
public static void getDelegationTokens(Configuration job,
|
public static void getDelegationTokens(Configuration job,
|
||||||
Credentials credentials) throws IOException {
|
Credentials credentials) throws IOException {
|
||||||
URI[] tarchives = DistributedCache.getCacheArchives(job);
|
URI[] tarchives = JobContextImpl.getCacheArchives(job);
|
||||||
URI[] tfiles = DistributedCache.getCacheFiles(job);
|
URI[] tfiles = JobContextImpl.getCacheFiles(job);
|
||||||
|
|
||||||
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
|
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
|
||||||
Path[] ps = new Path[size];
|
Path[] ps = new Path[size];
|
||||||
@ -159,7 +160,7 @@ public static void getDelegationTokens(Configuration job,
|
|||||||
*/
|
*/
|
||||||
public static void determineCacheVisibilities(Configuration job,
|
public static void determineCacheVisibilities(Configuration job,
|
||||||
Map<URI, FileStatus> statCache) throws IOException {
|
Map<URI, FileStatus> statCache) throws IOException {
|
||||||
URI[] tarchives = DistributedCache.getCacheArchives(job);
|
URI[] tarchives = JobContextImpl.getCacheArchives(job);
|
||||||
if (tarchives != null) {
|
if (tarchives != null) {
|
||||||
StringBuilder archiveVisibilities =
|
StringBuilder archiveVisibilities =
|
||||||
new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache)));
|
new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache)));
|
||||||
@ -169,7 +170,7 @@ public static void determineCacheVisibilities(Configuration job,
|
|||||||
}
|
}
|
||||||
setArchiveVisibilities(job, archiveVisibilities.toString());
|
setArchiveVisibilities(job, archiveVisibilities.toString());
|
||||||
}
|
}
|
||||||
URI[] tfiles = DistributedCache.getCacheFiles(job);
|
URI[] tfiles = JobContextImpl.getCacheFiles(job);
|
||||||
if (tfiles != null) {
|
if (tfiles != null) {
|
||||||
StringBuilder fileVisibilities =
|
StringBuilder fileVisibilities =
|
||||||
new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
|
new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.*;
|
import org.apache.hadoop.conf.*;
|
||||||
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.util.*;
|
import org.apache.hadoop.util.*;
|
||||||
import org.apache.hadoop.fs.*;
|
import org.apache.hadoop.fs.*;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@ -145,8 +146,7 @@ public class DistributedCache {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static void setCacheArchives(URI[] archives, Configuration conf) {
|
public static void setCacheArchives(URI[] archives, Configuration conf) {
|
||||||
String sarchives = StringUtils.uriToString(archives);
|
Job.setCacheArchives(archives, conf);
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES, sarchives);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -159,8 +159,7 @@ public static void setCacheArchives(URI[] archives, Configuration conf) {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static void setCacheFiles(URI[] files, Configuration conf) {
|
public static void setCacheFiles(URI[] files, Configuration conf) {
|
||||||
String sfiles = StringUtils.uriToString(files);
|
Job.setCacheFiles(files, conf);
|
||||||
conf.set(MRJobConfig.CACHE_FILES, sfiles);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -174,7 +173,7 @@ public static void setCacheFiles(URI[] files, Configuration conf) {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static URI[] getCacheArchives(Configuration conf) throws IOException {
|
public static URI[] getCacheArchives(Configuration conf) throws IOException {
|
||||||
return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_ARCHIVES));
|
return JobContextImpl.getCacheArchives(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -188,7 +187,7 @@ public static URI[] getCacheArchives(Configuration conf) throws IOException {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static URI[] getCacheFiles(Configuration conf) throws IOException {
|
public static URI[] getCacheFiles(Configuration conf) throws IOException {
|
||||||
return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_FILES));
|
return JobContextImpl.getCacheFiles(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -201,10 +200,8 @@ public static URI[] getCacheFiles(Configuration conf) throws IOException {
|
|||||||
* @see JobContext#getLocalCacheArchives()
|
* @see JobContext#getLocalCacheArchives()
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static Path[] getLocalCacheArchives(Configuration conf)
|
public static Path[] getLocalCacheArchives(Configuration conf) throws IOException {
|
||||||
throws IOException {
|
return JobContextImpl.getLocalCacheArchives(conf);
|
||||||
return StringUtils.stringToPath(conf
|
|
||||||
.getStrings(MRJobConfig.CACHE_LOCALARCHIVES));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -219,23 +216,7 @@ public static Path[] getLocalCacheArchives(Configuration conf)
|
|||||||
@Deprecated
|
@Deprecated
|
||||||
public static Path[] getLocalCacheFiles(Configuration conf)
|
public static Path[] getLocalCacheFiles(Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES));
|
return JobContextImpl.getLocalCacheFiles(conf);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parse a list of strings into longs.
|
|
||||||
* @param strs the list of strings to parse
|
|
||||||
* @return a list of longs that were parsed. same length as strs.
|
|
||||||
*/
|
|
||||||
private static long[] parseTimestamps(String[] strs) {
|
|
||||||
if (strs == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
long[] result = new long[strs.length];
|
|
||||||
for(int i=0; i < strs.length; ++i) {
|
|
||||||
result[i] = Long.parseLong(strs[i]);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -248,8 +229,7 @@ private static long[] parseTimestamps(String[] strs) {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static long[] getArchiveTimestamps(Configuration conf) {
|
public static long[] getArchiveTimestamps(Configuration conf) {
|
||||||
return parseTimestamps(
|
return JobContextImpl.getArchiveTimestamps(conf);
|
||||||
conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -263,8 +243,7 @@ public static long[] getArchiveTimestamps(Configuration conf) {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static long[] getFileTimestamps(Configuration conf) {
|
public static long[] getFileTimestamps(Configuration conf) {
|
||||||
return parseTimestamps(
|
return JobContextImpl.getFileTimestamps(conf);
|
||||||
conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -277,9 +256,7 @@ public static long[] getFileTimestamps(Configuration conf) {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static void addCacheArchive(URI uri, Configuration conf) {
|
public static void addCacheArchive(URI uri, Configuration conf) {
|
||||||
String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
|
Job.addCacheArchive(uri, conf);
|
||||||
conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
|
|
||||||
: archives + "," + uri.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -307,9 +284,7 @@ public static void addCacheArchive(URI uri, Configuration conf) {
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static void addCacheFile(URI uri, Configuration conf) {
|
public static void addCacheFile(URI uri, Configuration conf) {
|
||||||
String files = conf.get(MRJobConfig.CACHE_FILES);
|
Job.addCacheFile(uri, conf);
|
||||||
conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + ","
|
|
||||||
+ uri.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -323,9 +298,8 @@ public static void addCacheFile(URI uri, Configuration conf) {
|
|||||||
* @see Job#addFileToClassPath(Path)
|
* @see Job#addFileToClassPath(Path)
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static void addFileToClassPath(Path file, Configuration conf)
|
public static void addFileToClassPath(Path file, Configuration conf) throws IOException {
|
||||||
throws IOException {
|
Job.addFileToClassPath(file, conf, file.getFileSystem(conf));
|
||||||
addFileToClassPath(file, conf, file.getFileSystem(conf));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -340,7 +314,7 @@ public static void addFileToClassPath(Path file, Configuration conf)
|
|||||||
*/
|
*/
|
||||||
public static void addFileToClassPath(Path file, Configuration conf,
|
public static void addFileToClassPath(Path file, Configuration conf,
|
||||||
FileSystem fs) {
|
FileSystem fs) {
|
||||||
addFileToClassPath(file, conf, fs, true);
|
Job.addFileToClassPath(file, conf, fs, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -357,14 +331,7 @@ public static void addFileToClassPath(Path file, Configuration conf,
|
|||||||
*/
|
*/
|
||||||
public static void addFileToClassPath(Path file, Configuration conf,
|
public static void addFileToClassPath(Path file, Configuration conf,
|
||||||
FileSystem fs, boolean addToCache) {
|
FileSystem fs, boolean addToCache) {
|
||||||
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
|
Job.addFileToClassPath(file, conf, fs, addToCache);
|
||||||
conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
|
|
||||||
: classpath + "," + file.toString());
|
|
||||||
|
|
||||||
if (addToCache) {
|
|
||||||
URI uri = fs.makeQualified(file).toUri();
|
|
||||||
addCacheFile(uri, conf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -377,16 +344,7 @@ public static void addFileToClassPath(Path file, Configuration conf,
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static Path[] getFileClassPaths(Configuration conf) {
|
public static Path[] getFileClassPaths(Configuration conf) {
|
||||||
ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
|
return JobContextImpl.getFileClassPaths(conf);
|
||||||
MRJobConfig.CLASSPATH_FILES);
|
|
||||||
if (list.size() == 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
Path[] paths = new Path[list.size()];
|
|
||||||
for (int i = 0; i < list.size(); i++) {
|
|
||||||
paths[i] = new Path(list.get(i));
|
|
||||||
}
|
|
||||||
return paths;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -401,7 +359,7 @@ public static Path[] getFileClassPaths(Configuration conf) {
|
|||||||
@Deprecated
|
@Deprecated
|
||||||
public static void addArchiveToClassPath(Path archive, Configuration conf)
|
public static void addArchiveToClassPath(Path archive, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
|
Job.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -415,12 +373,7 @@ public static void addArchiveToClassPath(Path archive, Configuration conf)
|
|||||||
public static void addArchiveToClassPath
|
public static void addArchiveToClassPath
|
||||||
(Path archive, Configuration conf, FileSystem fs)
|
(Path archive, Configuration conf, FileSystem fs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES);
|
Job.addArchiveToClassPath(archive, conf, fs);
|
||||||
conf.set(MRJobConfig.CLASSPATH_ARCHIVES, classpath == null ? archive
|
|
||||||
.toString() : classpath + "," + archive.toString());
|
|
||||||
URI uri = fs.makeQualified(archive).toUri();
|
|
||||||
|
|
||||||
addCacheArchive(uri, conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -433,16 +386,7 @@ public static void addArchiveToClassPath(Path archive, Configuration conf)
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static Path[] getArchiveClassPaths(Configuration conf) {
|
public static Path[] getArchiveClassPaths(Configuration conf) {
|
||||||
ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
|
return JobContextImpl.getArchiveClassPaths(conf);
|
||||||
MRJobConfig.CLASSPATH_ARCHIVES);
|
|
||||||
if (list.size() == 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
Path[] paths = new Path[list.size()];
|
|
||||||
for (int i = 0; i < list.size(); i++) {
|
|
||||||
paths[i] = new Path(list.get(i));
|
|
||||||
}
|
|
||||||
return paths;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@ -43,6 +44,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
|
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A read-only view of the job that is provided to the tasks while they
|
* A read-only view of the job that is provided to the tasks while they
|
||||||
@ -305,7 +307,27 @@ public boolean getSymlink() {
|
|||||||
* Get the archive entries in classpath as an array of Path
|
* Get the archive entries in classpath as an array of Path
|
||||||
*/
|
*/
|
||||||
public Path[] getArchiveClassPaths() {
|
public Path[] getArchiveClassPaths() {
|
||||||
return DistributedCache.getArchiveClassPaths(conf);
|
return getArchiveClassPaths(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the archive entries in classpath as an array of Path.
|
||||||
|
* Used by internal DistributedCache code.
|
||||||
|
*
|
||||||
|
* @param conf Configuration that contains the classpath setting.
|
||||||
|
* @return An array of Path consisting of archive entries in classpath.
|
||||||
|
*/
|
||||||
|
public static Path[] getArchiveClassPaths(Configuration conf) {
|
||||||
|
ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
|
||||||
|
MRJobConfig.CLASSPATH_ARCHIVES);
|
||||||
|
if (list.size() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
Path[] paths = new Path[list.size()];
|
||||||
|
for (int i = 0; i < list.size(); i++) {
|
||||||
|
paths[i] = new Path(list.get(i));
|
||||||
|
}
|
||||||
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -314,7 +336,18 @@ public Path[] getArchiveClassPaths() {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public URI[] getCacheArchives() throws IOException {
|
public URI[] getCacheArchives() throws IOException {
|
||||||
return DistributedCache.getCacheArchives(conf);
|
return getCacheArchives(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get cache archives set in the Configuration. Used by
|
||||||
|
* internal DistributedCache and JobContextImpl code.
|
||||||
|
*
|
||||||
|
* @param conf The configuration which contains the archives.
|
||||||
|
* @return A URI array of the caches set in the Configuration.
|
||||||
|
*/
|
||||||
|
public static URI[] getCacheArchives(Configuration conf) {
|
||||||
|
return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_ARCHIVES));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -324,7 +357,18 @@ public URI[] getCacheArchives() throws IOException {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
public URI[] getCacheFiles() throws IOException {
|
public URI[] getCacheFiles() throws IOException {
|
||||||
return DistributedCache.getCacheFiles(conf);
|
return getCacheFiles(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get cache files set in the Configuration. Used by internal
|
||||||
|
* DistributedCache and MapReduce code.
|
||||||
|
*
|
||||||
|
* @param conf The configuration which contains the files.
|
||||||
|
* @return A URI array of the files set in the Configuration.
|
||||||
|
*/
|
||||||
|
public static URI[] getCacheFiles(Configuration conf) {
|
||||||
|
return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_FILES));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -334,7 +378,17 @@ public URI[] getCacheFiles() throws IOException {
|
|||||||
*/
|
*/
|
||||||
public Path[] getLocalCacheArchives()
|
public Path[] getLocalCacheArchives()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return DistributedCache.getLocalCacheArchives(conf);
|
return getLocalCacheArchives(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the path array of the localized caches.
|
||||||
|
*
|
||||||
|
* @param conf Configuration that contains the localized archives.
|
||||||
|
* @return A path array of localized caches.
|
||||||
|
*/
|
||||||
|
public static Path[] getLocalCacheArchives(Configuration conf) {
|
||||||
|
return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALARCHIVES));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -344,14 +398,82 @@ public Path[] getLocalCacheArchives()
|
|||||||
*/
|
*/
|
||||||
public Path[] getLocalCacheFiles()
|
public Path[] getLocalCacheFiles()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return DistributedCache.getLocalCacheFiles(conf);
|
return getLocalCacheFiles(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the path array of the localized files.
|
||||||
|
*
|
||||||
|
* @param conf Configuration that contains the localized files.
|
||||||
|
* @return A path array of localized files.
|
||||||
|
*/
|
||||||
|
public static Path[] getLocalCacheFiles(Configuration conf) {
|
||||||
|
return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a list of strings into longs.
|
||||||
|
* @param strs the list of strings to parse
|
||||||
|
* @return a list of longs that were parsed. same length as strs.
|
||||||
|
*/
|
||||||
|
private static long[] parseTimestamps(String[] strs) {
|
||||||
|
if (strs == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
long[] result = new long[strs.length];
|
||||||
|
for(int i=0; i < strs.length; ++i) {
|
||||||
|
result[i] = Long.parseLong(strs[i]);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the timestamps of the archives. Used by internal
|
||||||
|
* DistributedCache and MapReduce code.
|
||||||
|
*
|
||||||
|
* @param conf The configuration which stored the timestamps.
|
||||||
|
* @return a long array of timestamps.
|
||||||
|
*/
|
||||||
|
public static long[] getArchiveTimestamps(Configuration conf) {
|
||||||
|
return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the timestamps of the files. Used by internal
|
||||||
|
* DistributedCache and MapReduce code.
|
||||||
|
*
|
||||||
|
* @param conf The configuration which stored the timestamps.
|
||||||
|
* @return a long array of timestamps.
|
||||||
|
*/
|
||||||
|
public static long[] getFileTimestamps(Configuration conf) {
|
||||||
|
return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the file entries in classpath as an array of Path
|
* Get the file entries in classpath as an array of Path
|
||||||
*/
|
*/
|
||||||
public Path[] getFileClassPaths() {
|
public Path[] getFileClassPaths() {
|
||||||
return DistributedCache.getFileClassPaths(conf);
|
return getFileClassPaths(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the file entries in classpath as an array of Path.
|
||||||
|
* Used by internal DistributedCache code.
|
||||||
|
*
|
||||||
|
* @param conf Configuration that contains the classpath setting.
|
||||||
|
* @return Array of Path consisting of file entries in the classpath.
|
||||||
|
*/
|
||||||
|
public static Path[] getFileClassPaths(Configuration conf) {
|
||||||
|
ArrayList<String> list =
|
||||||
|
(ArrayList<String>) conf.getStringCollection(MRJobConfig.CLASSPATH_FILES);
|
||||||
|
if (list.size() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
Path[] paths = new Path[list.size()];
|
||||||
|
for (int i = 0; i < list.size(); i++) {
|
||||||
|
paths[i] = new Path(list.get(i));
|
||||||
|
}
|
||||||
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -376,7 +498,7 @@ private static String[] toTimestampStrs(long[] timestamps) {
|
|||||||
* @return a string array of timestamps
|
* @return a string array of timestamps
|
||||||
*/
|
*/
|
||||||
public String[] getArchiveTimestamps() {
|
public String[] getArchiveTimestamps() {
|
||||||
return toTimestampStrs(DistributedCache.getArchiveTimestamps(conf));
|
return toTimestampStrs(getArchiveTimestamps(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -385,7 +507,7 @@ public String[] getArchiveTimestamps() {
|
|||||||
* @return a string array of timestamps
|
* @return a string array of timestamps
|
||||||
*/
|
*/
|
||||||
public String[] getFileTimestamps() {
|
public String[] getFileTimestamps() {
|
||||||
return toTimestampStrs(DistributedCache.getFileTimestamps(conf));
|
return toTimestampStrs(getFileTimestamps(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -26,15 +26,8 @@
|
|||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.JobClient;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
|
||||||
import org.apache.hadoop.mapred.Mapper;
|
|
||||||
import org.apache.hadoop.mapred.OutputCollector;
|
|
||||||
import org.apache.hadoop.mapred.Reducer;
|
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
|
||||||
import org.apache.hadoop.util.*;
|
import org.apache.hadoop.util.*;
|
||||||
import org.apache.hadoop.mapred.MapReduceBase;
|
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
@ -62,8 +55,8 @@ public static class MapClass extends MapReduceBase
|
|||||||
public void configure(JobConf jconf) {
|
public void configure(JobConf jconf) {
|
||||||
conf = jconf;
|
conf = jconf;
|
||||||
try {
|
try {
|
||||||
Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
|
Path[] localArchives = JobContextImpl.getLocalCacheArchives(conf);
|
||||||
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
|
Path[] localFiles = JobContextImpl.getLocalCacheFiles(conf);
|
||||||
// read the cached files (unzipped, unjarred and text)
|
// read the cached files (unzipped, unjarred and text)
|
||||||
// and put it into a single file TEST_ROOT_DIR/test.txt
|
// and put it into a single file TEST_ROOT_DIR/test.txt
|
||||||
String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
|
String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
|
||||||
@ -254,7 +247,7 @@ public static TestResult launchMRCache(String indir,
|
|||||||
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
|
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
|
||||||
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
|
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
|
||||||
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
|
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
|
||||||
DistributedCache.addCacheFile(uris[0], conf);
|
Job.addCacheFile(uris[0], conf);
|
||||||
|
|
||||||
// Save expected file sizes
|
// Save expected file sizes
|
||||||
long[] fileSizes = new long[1];
|
long[] fileSizes = new long[1];
|
||||||
@ -262,7 +255,7 @@ public static TestResult launchMRCache(String indir,
|
|||||||
|
|
||||||
long[] archiveSizes = new long[5]; // track last 5
|
long[] archiveSizes = new long[5]; // track last 5
|
||||||
for (int i = 1; i < 6; i++) {
|
for (int i = 1; i < 6; i++) {
|
||||||
DistributedCache.addCacheArchive(uris[i], conf);
|
Job.addCacheArchive(uris[i], conf);
|
||||||
archiveSizes[i-1] = // starting with second archive
|
archiveSizes[i-1] = // starting with second archive
|
||||||
fs.getFileStatus(new Path(uris[i].getPath())).getLen();
|
fs.getFileStatus(new Path(uris[i].getPath())).getLen();
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@
|
|||||||
import org.apache.hadoop.mapred.TextInputFormat;
|
import org.apache.hadoop.mapred.TextInputFormat;
|
||||||
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
||||||
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -111,7 +111,7 @@ public void testCombinerShouldUpdateTheReporter() throws Exception {
|
|||||||
conf.setCombinerClass(MyCombinerToCheckReporter.class);
|
conf.setCombinerClass(MyCombinerToCheckReporter.class);
|
||||||
//conf.setJarByClass(MyCombinerToCheckReporter.class);
|
//conf.setJarByClass(MyCombinerToCheckReporter.class);
|
||||||
conf.setReducerClass(IdentityReducer.class);
|
conf.setReducerClass(IdentityReducer.class);
|
||||||
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
|
Job.addFileToClassPath(TestMRJobs.APP_JAR, conf, TestMRJobs.APP_JAR.getFileSystem(conf));
|
||||||
conf.setOutputCommitter(CustomOutputCommitter.class);
|
conf.setOutputCommitter(CustomOutputCommitter.class);
|
||||||
conf.setInputFormat(TextInputFormat.class);
|
conf.setInputFormat(TextInputFormat.class);
|
||||||
conf.setOutputKeyClass(LongWritable.class);
|
conf.setOutputKeyClass(LongWritable.class);
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||||
@ -969,10 +970,12 @@ protected void setJobConf() throws IOException {
|
|||||||
fail(LINK_URI);
|
fail(LINK_URI);
|
||||||
}
|
}
|
||||||
// set the jobconf for the caching parameters
|
// set the jobconf for the caching parameters
|
||||||
if (cacheArchives != null)
|
if (cacheArchives != null) {
|
||||||
DistributedCache.setCacheArchives(archiveURIs, jobConf_);
|
Job.setCacheArchives(archiveURIs, jobConf_);
|
||||||
if (cacheFiles != null)
|
}
|
||||||
DistributedCache.setCacheFiles(fileURIs, jobConf_);
|
if (cacheFiles != null) {
|
||||||
|
Job.setCacheFiles(fileURIs, jobConf_);
|
||||||
|
}
|
||||||
|
|
||||||
if (verbose_) {
|
if (verbose_) {
|
||||||
listJobConfProperties();
|
listJobConfProperties();
|
||||||
|
Loading…
Reference in New Issue
Block a user