MAPREDUCE-4493. Distibuted Cache Compatability Issues (Robert Evans via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1367713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
75be1e0e76
commit
735b50e8bd
@ -196,7 +196,9 @@ Deprecated Properties
|
||||
*---+---+
|
||||
|mapred.compress.map.output | mapreduce.map.output.compress
|
||||
*---+---+
|
||||
|mapred.create.symlink | mapreduce.job.cache.symlink.create
|
||||
|mapred.create.symlink | NONE - symlinking is always on
|
||||
*---+---+
|
||||
|mapreduce.job.cache.symlink.create | NONE - symlinking is always on
|
||||
*---+---+
|
||||
|mapred.data.field.separator | mapreduce.fieldsel.data.field.separator
|
||||
*---+---+
|
||||
|
@ -771,6 +771,9 @@ Release 0.23.3 - UNRELEASED
|
||||
|
||||
MAPREDUCE-4496. AM logs link is missing user name (Jason Lowe via bobby)
|
||||
|
||||
MAPREDUCE-4493. Distibuted Cache Compatability Issues (Robert Evans
|
||||
via tgraves)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -84,7 +84,6 @@ class LocalDistributedCacheManager {
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setup(JobConf conf) throws IOException {
|
||||
boolean mkLinks = DistributedCache.getSymlink(conf);
|
||||
File workDir = new File(System.getProperty("user.dir"));
|
||||
|
||||
// Generate YARN local resources objects corresponding to the distributed
|
||||
@ -145,11 +144,9 @@ public void setup(JobConf conf) throws IOException {
|
||||
throw new IOException(e);
|
||||
}
|
||||
String pathString = path.toUri().toString();
|
||||
if(mkLinks) {
|
||||
String link = entry.getKey();
|
||||
String target = new File(path.toUri()).getPath();
|
||||
symlink(workDir, target, link);
|
||||
}
|
||||
String link = entry.getKey();
|
||||
String target = new File(path.toUri()).getPath();
|
||||
symlink(workDir, target, link);
|
||||
|
||||
if (resource.getType() == LocalResourceType.ARCHIVE) {
|
||||
localArchives.add(pathString);
|
||||
|
@ -150,7 +150,6 @@ public Object answer(InvocationOnMock args) throws Throwable {
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
|
||||
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
|
||||
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
try {
|
||||
manager.setup(conf);
|
||||
@ -197,7 +196,6 @@ public Object answer(InvocationOnMock args) throws Throwable {
|
||||
|
||||
conf.set(MRJobConfig.CACHE_FILES, "");
|
||||
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
|
||||
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
try {
|
||||
manager.setup(conf);
|
||||
@ -268,7 +266,6 @@ public Object answer(InvocationOnMock args) throws Throwable {
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");
|
||||
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
|
||||
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
try {
|
||||
manager.setup(conf);
|
||||
|
@ -146,7 +146,6 @@ private void testWithConf(Configuration conf) throws IOException,
|
||||
job.addFileToClassPath(second);
|
||||
job.addArchiveToClassPath(third);
|
||||
job.addCacheArchive(fourth.toUri());
|
||||
job.createSymlink();
|
||||
job.setMaxMapAttempts(1); // speed up failures
|
||||
|
||||
job.submit();
|
||||
|
@ -48,8 +48,12 @@
|
||||
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
|
||||
* Jars may be optionally added to the classpath of the tasks, a rudimentary
|
||||
* software distribution mechanism. Files have execution permissions.
|
||||
* Optionally users can also direct it to symlink the distributed cache file(s)
|
||||
* into the working directory of the task.</p>
|
||||
* In older version of Hadoop Map/Reduce users could optionally ask for symlinks
|
||||
* to be created in the working directory of the child task. In the current
|
||||
* version symlinks are always created. If the URL does not have a fragment
|
||||
* the name of the file or directory will be used. If multiple files or
|
||||
* directories map to the same link name, the last one added, will be used. All
|
||||
* others will not even be downloaded.</p>
|
||||
*
|
||||
* <p><code>DistributedCache</code> tracks modification timestamps of the cache
|
||||
* files. Clearly the cache files should not be modified by the application
|
||||
@ -91,8 +95,7 @@
|
||||
*
|
||||
* public void configure(JobConf job) {
|
||||
* // Get the cached archives/files
|
||||
* localArchives = DistributedCache.getLocalCacheArchives(job);
|
||||
* localFiles = DistributedCache.getLocalCacheFiles(job);
|
||||
* File f = new File("./map.zip/some/file/in/zip.txt");
|
||||
* }
|
||||
*
|
||||
* public void map(K key, V value,
|
||||
|
@ -313,7 +313,6 @@ private static void setupPipesJob(JobConf conf) throws IOException {
|
||||
// add default debug script only when executable is expressed as
|
||||
// <path>#<executable>
|
||||
if (exec.contains("#")) {
|
||||
DistributedCache.createSymlink(conf);
|
||||
// set default gdb commands for map and reduce task
|
||||
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
|
||||
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
|
||||
|
@ -1049,9 +1049,10 @@ public void addArchiveToClassPath(Path archive)
|
||||
}
|
||||
|
||||
/**
|
||||
* This method allows you to create symlinks in the current working directory
|
||||
* of the task to all the cache files/archives
|
||||
* Originally intended to enable symlinks, but currently symlinks cannot be
|
||||
* disabled.
|
||||
*/
|
||||
@Deprecated
|
||||
public void createSymlink() {
|
||||
ensureState(JobState.DEFINE);
|
||||
DistributedCache.createSymlink(conf);
|
||||
|
@ -221,10 +221,11 @@ public Class<? extends Partitioner<?,?>> getPartitionerClass()
|
||||
public String getUser();
|
||||
|
||||
/**
|
||||
* This method checks to see if symlinks are to be create for the
|
||||
* localized cache files in the current working directory
|
||||
* @return true if symlinks are to be created- else return false
|
||||
* Originally intended to check if symlinks should be used, but currently
|
||||
* symlinks cannot be disabled.
|
||||
* @return true
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean getSymlink();
|
||||
|
||||
/**
|
||||
@ -251,14 +252,22 @@ public Class<? extends Partitioner<?,?>> getPartitionerClass()
|
||||
* Return the path array of the localized caches
|
||||
* @return A path array of localized caches
|
||||
* @throws IOException
|
||||
* @deprecated the array returned only includes the items the were
|
||||
* downloaded. There is no way to map this to what is returned by
|
||||
* {@link #getCacheArchives()}.
|
||||
*/
|
||||
@Deprecated
|
||||
public Path[] getLocalCacheArchives() throws IOException;
|
||||
|
||||
/**
|
||||
* Return the path array of the localized files
|
||||
* @return A path array of localized files
|
||||
* @throws IOException
|
||||
* @deprecated the array returned only includes the items the were
|
||||
* downloaded. There is no way to map this to what is returned by
|
||||
* {@link #getCacheFiles()}.
|
||||
*/
|
||||
@Deprecated
|
||||
public Path[] getLocalCacheFiles() throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -190,7 +190,6 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir,
|
||||
//should not throw a uri exception
|
||||
throw new IOException("Failed to create uri for " + tmpFile, ue);
|
||||
}
|
||||
DistributedCache.createSymlink(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@ -225,7 +224,6 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir,
|
||||
//should not throw an uri excpetion
|
||||
throw new IOException("Failed to create uri for " + tmpArchives, ue);
|
||||
}
|
||||
DistributedCache.createSymlink(conf);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,6 +114,10 @@ public interface MRJobConfig {
|
||||
|
||||
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
|
||||
|
||||
/**
|
||||
* @deprecated Symlinks are always on and cannot be disabled.
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
|
||||
|
||||
public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
|
||||
|
@ -55,8 +55,12 @@
|
||||
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
|
||||
* Jars may be optionally added to the classpath of the tasks, a rudimentary
|
||||
* software distribution mechanism. Files have execution permissions.
|
||||
* Optionally users can also direct it to symlink the distributed cache file(s)
|
||||
* into the working directory of the task.</p>
|
||||
* In older version of Hadoop Map/Reduce users could optionally ask for symlinks
|
||||
* to be created in the working directory of the child task. In the current
|
||||
* version symlinks are always created. If the URL does not have a fragment
|
||||
* the name of the file or directory will be used. If multiple files or
|
||||
* directories map to the same link name, the last one added, will be used. All
|
||||
* others will not even be downloaded.</p>
|
||||
*
|
||||
* <p><code>DistributedCache</code> tracks modification timestamps of the cache
|
||||
* files. Clearly the cache files should not be modified by the application
|
||||
@ -98,8 +102,7 @@
|
||||
*
|
||||
* public void configure(JobConf job) {
|
||||
* // Get the cached archives/files
|
||||
* localArchives = DistributedCache.getLocalCacheArchives(job);
|
||||
* localFiles = DistributedCache.getLocalCacheFiles(job);
|
||||
* File f = new File("./map.zip/some/file/in/zip.txt");
|
||||
* }
|
||||
*
|
||||
* public void map(K key, V value,
|
||||
@ -375,32 +378,26 @@ public static Path[] getArchiveClassPaths(Configuration conf) {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method allows you to create symlinks in the current working directory
|
||||
* of the task to all the cache files/archives.
|
||||
* Intended to be used by user code.
|
||||
* Originally intended to enable symlinks, but currently symlinks cannot be
|
||||
* disabled. This is a NO-OP.
|
||||
* @param conf the jobconf
|
||||
* @deprecated Use {@link Job#createSymlink()} instead
|
||||
* @deprecated This is a NO-OP.
|
||||
*/
|
||||
@Deprecated
|
||||
public static void createSymlink(Configuration conf){
|
||||
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
|
||||
//NOOP
|
||||
}
|
||||
|
||||
/**
|
||||
* This method checks to see if symlinks are to be create for the
|
||||
* localized cache files in the current working directory
|
||||
* Used by internal DistributedCache code.
|
||||
* Originally intended to check if symlinks should be used, but currently
|
||||
* symlinks cannot be disabled.
|
||||
* @param conf the jobconf
|
||||
* @return true if symlinks are to be created- else return false
|
||||
* @deprecated Use {@link JobContext#getSymlink()} instead
|
||||
* @return true
|
||||
* @deprecated symlinks are always created.
|
||||
*/
|
||||
@Deprecated
|
||||
public static boolean getSymlink(Configuration conf){
|
||||
String result = conf.get(MRJobConfig.CACHE_SYMLINK);
|
||||
if ("yes".equals(result)){
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean[] parseBooleans(String[] strs) {
|
||||
|
@ -246,8 +246,6 @@ private static void addDeprecatedKeys() {
|
||||
new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
|
||||
Configuration.addDeprecation("mapred.cache.archives.timestamps",
|
||||
new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS});
|
||||
Configuration.addDeprecation("mapred.create.symlink",
|
||||
new String[] {MRJobConfig.CACHE_SYMLINK});
|
||||
Configuration.addDeprecation("mapred.working.dir",
|
||||
new String[] {MRJobConfig.WORKING_DIR});
|
||||
Configuration.addDeprecation("user.name",
|
||||
|
@ -210,19 +210,10 @@ static void setupCache(String cacheDir, FileSystem fs)
|
||||
fs.copyFromLocalFile(tarPath1, cachePath);
|
||||
fs.copyFromLocalFile(tarPath2, cachePath);
|
||||
}
|
||||
|
||||
public static TestResult launchMRCache(String indir,
|
||||
String outdir, String cacheDir,
|
||||
JobConf conf, String input)
|
||||
throws IOException {
|
||||
setupCache(cacheDir, FileSystem.get(conf));
|
||||
return launchMRCache(indir,outdir, cacheDir, conf, input, false);
|
||||
}
|
||||
|
||||
public static TestResult launchMRCache(String indir,
|
||||
String outdir, String cacheDir,
|
||||
JobConf conf, String input,
|
||||
boolean withSymlink)
|
||||
JobConf conf, String input)
|
||||
throws IOException {
|
||||
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
|
||||
.toString().replace(' ', '+');
|
||||
@ -256,24 +247,13 @@ public static TestResult launchMRCache(String indir,
|
||||
conf.setNumReduceTasks(1);
|
||||
conf.setSpeculativeExecution(false);
|
||||
URI[] uris = new URI[6];
|
||||
if (!withSymlink) {
|
||||
conf.setMapperClass(MRCaching.MapClass.class);
|
||||
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
|
||||
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
|
||||
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
|
||||
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
|
||||
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
|
||||
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
|
||||
} else {
|
||||
DistributedCache.createSymlink(conf);
|
||||
conf.setMapperClass(MRCaching.MapClass2.class);
|
||||
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
|
||||
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
|
||||
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
|
||||
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
|
||||
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
|
||||
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
|
||||
}
|
||||
conf.setMapperClass(MRCaching.MapClass2.class);
|
||||
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
|
||||
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
|
||||
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
|
||||
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
|
||||
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
|
||||
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
|
||||
DistributedCache.addCacheFile(uris[0], conf);
|
||||
|
||||
// Save expected file sizes
|
||||
|
@ -48,7 +48,7 @@ public void testWithDFS() throws IOException {
|
||||
"/cachedir",
|
||||
mr.createJobConf(),
|
||||
"The quick brown fox\nhas many silly\n"
|
||||
+ "red fox sox\n", false);
|
||||
+ "red fox sox\n");
|
||||
assertTrue("Archives not matching", ret.isOutputOk);
|
||||
// launch MR cache with symlinks
|
||||
ret = MRCaching.launchMRCache("/testing/wc/input",
|
||||
@ -56,7 +56,7 @@ public void testWithDFS() throws IOException {
|
||||
"/cachedir",
|
||||
mr.createJobConf(),
|
||||
"The quick brown fox\nhas many silly\n"
|
||||
+ "red fox sox\n", true);
|
||||
+ "red fox sox\n");
|
||||
assertTrue("Archives not matching", ret.isOutputOk);
|
||||
} finally {
|
||||
if (fileSys != null) {
|
||||
|
@ -211,6 +211,7 @@ public void testRandomWriter() throws IOException, InterruptedException,
|
||||
Path outputDir =
|
||||
new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
job.setSpeculativeExecution(false);
|
||||
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
||||
job.setJarByClass(RandomTextWriterJob.class);
|
||||
job.setMaxMapAttempts(1); // speed up failures
|
||||
@ -462,7 +463,6 @@ public void testDistributedCache() throws Exception {
|
||||
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
||||
job.addArchiveToClassPath(third);
|
||||
job.addCacheArchive(fourth.toUri());
|
||||
job.createSymlink();
|
||||
job.setMaxMapAttempts(1); // speed up failures
|
||||
|
||||
job.submit();
|
||||
|
@ -301,7 +301,6 @@ private Job runSpecTest(boolean mapspec, boolean redspec)
|
||||
|
||||
// Creates the Job Configuration
|
||||
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
||||
job.createSymlink();
|
||||
job.setMaxMapAttempts(2);
|
||||
|
||||
job.submit();
|
||||
|
@ -167,7 +167,6 @@ public int run(String[] args) throws Exception {
|
||||
URI partitionUri = new URI(partitionFile.toString() +
|
||||
"#" + "_sortPartitioning");
|
||||
DistributedCache.addCacheFile(partitionUri, conf);
|
||||
DistributedCache.createSymlink(conf);
|
||||
}
|
||||
|
||||
System.out.println("Running on " +
|
||||
|
@ -305,8 +305,7 @@ public int run(String[] args) throws Exception {
|
||||
LOG.error(e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
job.addCacheFile(partitionUri);
|
||||
job.createSymlink();
|
||||
job.addCacheFile(partitionUri);
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("Spent " + (end - start) + "ms computing partitions.");
|
||||
job.setPartitionerClass(TotalOrderPartitioner.class);
|
||||
|
@ -958,7 +958,6 @@ protected void setJobConf() throws IOException {
|
||||
if (!b)
|
||||
fail(LINK_URI);
|
||||
}
|
||||
DistributedCache.createSymlink(jobConf_);
|
||||
// set the jobconf for the caching parameters
|
||||
if (cacheArchives != null)
|
||||
DistributedCache.setCacheArchives(archiveURIs, jobConf_);
|
||||
|
Loading…
Reference in New Issue
Block a user