From 08c803ea30321825f86fc8a431870ff03a65d379 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sat, 8 Jan 2022 20:48:10 +0530 Subject: [PATCH] MAPREDUCE-7371. DistributedCache alternative APIs should not use DistributedCache APIs internally (#3855) --- .../mapred/LocalDistributedCacheManager.java | 5 +- .../hadoop/mapreduce/v2/util/MRApps.java | 25 ++-- .../TestLocalDistributedCacheManager.java | 7 +- .../mapred/TestMRWithDistributedCache.java | 44 +++--- .../hadoop/mapreduce/v2/util/TestMRApps.java | 13 +- .../hadoop/mapred/pipes/Application.java | 4 +- .../apache/hadoop/mapred/pipes/Submitter.java | 7 +- .../java/org/apache/hadoop/mapreduce/Job.java | 121 ++++++++++++++- .../hadoop/mapreduce/JobResourceUploader.java | 13 +- .../apache/hadoop/mapreduce/JobSubmitter.java | 2 +- .../ClientDistributedCacheManager.java | 13 +- .../mapreduce/filecache/DistributedCache.java | 96 +++--------- .../hadoop/mapreduce/task/JobContextImpl.java | 138 +++++++++++++++++- .../org/apache/hadoop/mapred/MRCaching.java | 17 +-- .../mapreduce/v2/TestMRAppWithCombiner.java | 4 +- .../apache/hadoop/streaming/StreamJob.java | 11 +- 16 files changed, 344 insertions(+), 176 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 8f17ffda42..e5a50f013f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -95,13 +94,13 @@ public synchronized void setup(JobConf conf, JobID jobId) throws IOException { // Find which resources are to be put on the local classpath Map classpaths = new HashMap(); - Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf); + Path[] archiveClassPaths = JobContextImpl.getArchiveClassPaths(conf); if (archiveClassPaths != null) { for (Path p : archiveClassPaths) { classpaths.put(p.toUri().getPath().toString(), p); } } - Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf); + Path[] fileClassPaths = JobContextImpl.getFileClassPaths(conf); if (fileClassPaths != null) { for (Path p : fileClassPaths) { classpaths.put(p.toUri().getPath().toString(), p); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 010a4ed411..a3ccfd72d8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -272,12 +273,12 @@ public static void setClasspath(Map environment, crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf); // a * in the classpath will only find a .jar, so we need to filter out // all .jars and add everything else - addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), - DistributedCache.getCacheFiles(conf), + addToClasspathIfNotJar(JobContextImpl.getFileClassPaths(conf), + JobContextImpl.getCacheFiles(conf), conf, environment, classpathEnvVar); - addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), - DistributedCache.getCacheArchives(conf), + addToClasspathIfNotJar(JobContextImpl.getArchiveClassPaths(conf), + JobContextImpl.getCacheArchives(conf), conf, environment, classpathEnvVar); if (userClassesTakesPrecedence) { @@ -483,8 +484,8 @@ public static void setupDistributedCache(Configuration conf, // Cache archives lrb.setType(LocalResourceType.ARCHIVE); - lrb.setUris(DistributedCache.getCacheArchives(conf)); - lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf)); + lrb.setUris(JobContextImpl.getCacheArchives(conf)); + lrb.setTimestamps(JobContextImpl.getArchiveTimestamps(conf)); lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES)); lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf)); lrb.setSharedCacheUploadPolicies( @@ -493,8 +494,8 @@ public static void setupDistributedCache(Configuration conf, // Cache files lrb.setType(LocalResourceType.FILE); - lrb.setUris(DistributedCache.getCacheFiles(conf)); - lrb.setTimestamps(DistributedCache.getFileTimestamps(conf)); + lrb.setUris(JobContextImpl.getCacheFiles(conf)); + lrb.setTimestamps(JobContextImpl.getFileTimestamps(conf)); lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES)); lrb.setVisibilities(DistributedCache.getFileVisibilities(conf)); lrb.setSharedCacheUploadPolicies( @@ -504,9 +505,9 @@ public static void setupDistributedCache(Configuration conf, /** * Set up the DistributedCache related configs to make - * {@link DistributedCache#getLocalCacheFiles(Configuration)} + * {@link JobContextImpl#getLocalCacheFiles(Configuration)} * and - * {@link DistributedCache#getLocalCacheArchives(Configuration)} + * {@link JobContextImpl#getLocalCacheArchives(Configuration)} * working. * @param conf * @throws java.io.IOException @@ -518,7 +519,7 @@ public static void setupDistributedCacheLocal(Configuration conf) // ^ ^ all symlinks are created in the current work-dir // Update the configuration object with localized archives. - URI[] cacheArchives = DistributedCache.getCacheArchives(conf); + URI[] cacheArchives = JobContextImpl.getCacheArchives(conf); if (cacheArchives != null) { List localArchives = new ArrayList(); for (int i = 0; i < cacheArchives.length; ++i) { @@ -538,7 +539,7 @@ public static void setupDistributedCacheLocal(Configuration conf) } // Update the configuration object with localized files. - URI[] cacheFiles = DistributedCache.getCacheFiles(conf); + URI[] cacheFiles = JobContextImpl.getCacheFiles(conf); if (cacheFiles != null) { List localFiles = new ArrayList(); for (int i = 0; i < cacheFiles.length; ++i) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java index b565d23029..5d1c669e50 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -51,7 +51,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -174,7 +173,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { } }); - DistributedCache.addCacheFile(file, conf); + Job.addCacheFile(file, conf); Map policies = new HashMap(); policies.put(file.toString(), true); Job.setFileSharedCacheUploadPolicies(conf, policies); @@ -286,8 +285,8 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { } }); - DistributedCache.addCacheFile(file, conf); - DistributedCache.addCacheFile(file, conf); + Job.addCacheFile(file, conf); + Job.addCacheFile(file, conf); Map policies = new HashMap(); policies.put(file.toString(), true); Job.setFileSharedCacheUploadPolicies(conf, policies); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java index 2f0a6bdc00..62b8815e6f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java @@ -229,60 +229,60 @@ public void testDeprecatedFunctions() throws Exception { Assert.assertEquals("Test Local Archives 1", conf.get(DistributedCache.CACHE_LOCALARCHIVES)); Assert.assertEquals(1, - DistributedCache.getLocalCacheArchives(conf).length); + JobContextImpl.getLocalCacheArchives(conf).length); Assert.assertEquals("Test Local Archives 1", - DistributedCache.getLocalCacheArchives(conf)[0].getName()); + JobContextImpl.getLocalCacheArchives(conf)[0].getName()); DistributedCache.addLocalArchives(conf, "Test Local Archives 2"); Assert.assertEquals("Test Local Archives 1,Test Local Archives 2", conf.get(DistributedCache.CACHE_LOCALARCHIVES)); Assert.assertEquals(2, - DistributedCache.getLocalCacheArchives(conf).length); + JobContextImpl.getLocalCacheArchives(conf).length); Assert.assertEquals("Test Local Archives 2", - DistributedCache.getLocalCacheArchives(conf)[1].getName()); + JobContextImpl.getLocalCacheArchives(conf)[1].getName()); DistributedCache.setLocalArchives(conf, "Test Local Archives 3"); Assert.assertEquals("Test Local Archives 3", conf.get(DistributedCache.CACHE_LOCALARCHIVES)); Assert.assertEquals(1, - DistributedCache.getLocalCacheArchives(conf).length); + JobContextImpl.getLocalCacheArchives(conf).length); Assert.assertEquals("Test Local Archives 3", - DistributedCache.getLocalCacheArchives(conf)[0].getName()); + JobContextImpl.getLocalCacheArchives(conf)[0].getName()); DistributedCache.addLocalFiles(conf, "Test Local Files 1"); Assert.assertEquals("Test Local Files 1", conf.get(DistributedCache.CACHE_LOCALFILES)); Assert.assertEquals(1, - DistributedCache.getLocalCacheFiles(conf).length); + JobContextImpl.getLocalCacheFiles(conf).length); Assert.assertEquals("Test Local Files 1", - DistributedCache.getLocalCacheFiles(conf)[0].getName()); + JobContextImpl.getLocalCacheFiles(conf)[0].getName()); DistributedCache.addLocalFiles(conf, "Test Local Files 2"); Assert.assertEquals("Test Local Files 1,Test Local Files 2", conf.get(DistributedCache.CACHE_LOCALFILES)); Assert.assertEquals(2, - DistributedCache.getLocalCacheFiles(conf).length); + JobContextImpl.getLocalCacheFiles(conf).length); Assert.assertEquals("Test Local Files 2", - DistributedCache.getLocalCacheFiles(conf)[1].getName()); + JobContextImpl.getLocalCacheFiles(conf)[1].getName()); DistributedCache.setLocalFiles(conf, "Test Local Files 3"); Assert.assertEquals("Test Local Files 3", conf.get(DistributedCache.CACHE_LOCALFILES)); Assert.assertEquals(1, - DistributedCache.getLocalCacheFiles(conf).length); + JobContextImpl.getLocalCacheFiles(conf).length); Assert.assertEquals("Test Local Files 3", - DistributedCache.getLocalCacheFiles(conf)[0].getName()); + JobContextImpl.getLocalCacheFiles(conf)[0].getName()); DistributedCache.setArchiveTimestamps(conf, "1234567890"); Assert.assertEquals(1234567890, conf.getLong(DistributedCache.CACHE_ARCHIVES_TIMESTAMPS, 0)); Assert.assertEquals(1, - DistributedCache.getArchiveTimestamps(conf).length); + JobContextImpl.getArchiveTimestamps(conf).length); Assert.assertEquals(1234567890, - DistributedCache.getArchiveTimestamps(conf)[0]); + JobContextImpl.getArchiveTimestamps(conf)[0]); DistributedCache.setFileTimestamps(conf, "1234567890"); Assert.assertEquals(1234567890, conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0)); Assert.assertEquals(1, - DistributedCache.getFileTimestamps(conf).length); + JobContextImpl.getFileTimestamps(conf).length); Assert.assertEquals(1234567890, - DistributedCache.getFileTimestamps(conf)[0]); + JobContextImpl.getFileTimestamps(conf)[0]); DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"), new File("Test Work Dir")); @@ -297,18 +297,18 @@ public void testDeprecatedFunctions() throws Exception { DistributedCache.getTimestamp(conf, symlinkFile.toURI())); Assert.assertTrue(symlinkFile.delete()); - DistributedCache.addCacheArchive(symlinkFile.toURI(), conf); + Job.addCacheArchive(symlinkFile.toURI(), conf); Assert.assertEquals(symlinkFile.toURI().toString(), conf.get(DistributedCache.CACHE_ARCHIVES)); - Assert.assertEquals(1, DistributedCache.getCacheArchives(conf).length); + Assert.assertEquals(1, JobContextImpl.getCacheArchives(conf).length); Assert.assertEquals(symlinkFile.toURI(), - DistributedCache.getCacheArchives(conf)[0]); + JobContextImpl.getCacheArchives(conf)[0]); - DistributedCache.addCacheFile(symlinkFile.toURI(), conf); + Job.addCacheFile(symlinkFile.toURI(), conf); Assert.assertEquals(symlinkFile.toURI().toString(), conf.get(DistributedCache.CACHE_FILES)); - Assert.assertEquals(1, DistributedCache.getCacheFiles(conf).length); + Assert.assertEquals(1, JobContextImpl.getCacheFiles(conf).length); Assert.assertEquals(symlinkFile.toURI(), - DistributedCache.getCacheFiles(conf)[0]); + JobContextImpl.getCacheFiles(conf)[0]); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index c6a287439d..62235b7267 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -45,7 +45,6 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; @@ -377,11 +376,11 @@ public void testSetupDistributedCacheConflicts() throws Exception { when(mockFs.resolvePath(archivePath)).thenReturn(archivePath); when(mockFs.resolvePath(filePath)).thenReturn(filePath); - DistributedCache.addCacheArchive(archive, conf); + Job.addCacheArchive(archive, conf); conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true"); - DistributedCache.addCacheFile(file, conf); + Job.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true"); @@ -416,8 +415,8 @@ public void testSetupDistributedCacheConflictsFiles() throws Exception { when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); - DistributedCache.addCacheFile(file, conf); - DistributedCache.addCacheFile(file2, conf); + Job.addCacheFile(file, conf); + Job.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); @@ -452,11 +451,11 @@ public void testSetupDistributedCache() throws Exception { when(mockFs.resolvePath(archivePath)).thenReturn(archivePath); when(mockFs.resolvePath(filePath)).thenReturn(filePath); - DistributedCache.addCacheArchive(archive, conf); + Job.addCacheArchive(archive, conf); conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true"); - DistributedCache.addCacheFile(file, conf); + Job.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java index c8d33841f5..767dcd4514 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java @@ -48,11 +48,11 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.ReflectionUtils; @@ -117,7 +117,7 @@ class Application + * The access permissions of the file will determine whether the localized + * file will be shared across jobs. If the file is not readable by other or + * if any of its parent directories is not executable by other, then the + * file will not be shared. In the case of a path that ends in "/*", + * sharing of the localized files will be determined solely from the + * access permissions of the parent directories. The access permissions of + * the individual files will be ignored. + * + * @param uri The uri of the cache to be localized. + * @param conf Configuration to add the cache to. + */ + public static void addCacheFile(URI uri, Configuration conf) { + String files = conf.get(MRJobConfig.CACHE_FILES); + conf.set(MRJobConfig.CACHE_FILES, + files == null ? uri.toString() : files + "," + uri.toString()); } /** @@ -1165,7 +1224,39 @@ public void addCacheFile(URI uri) { public void addFileToClassPath(Path file) throws IOException { ensureState(JobState.DEFINE); - DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); + addFileToClassPath(file, conf, file.getFileSystem(conf)); + } + + /** + * Add a file path to the current set of classpath entries. The file will + * also be added to the cache. + * + * @param file Path of the file to be added. + * @param conf Configuration that contains the classpath setting. + * @param fs FileSystem with respect to which {@code file} should be interpreted. + */ + public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) { + addFileToClassPath(file, conf, fs, true); + } + + /** + * Add a file path to the current set of classpath entries. The file will + * also be added to the cache if {@code addToCache} is true. + * + * @param file Path of the file to be added. + * @param conf Configuration that contains the classpath setting. + * @param fs FileSystem with respect to which {@code file} should be interpreted. + * @param addToCache Whether the file should also be added to the cache list. + */ + public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs, + boolean addToCache) { + String classpath = conf.get(MRJobConfig.CLASSPATH_FILES); + conf.set(MRJobConfig.CLASSPATH_FILES, + classpath == null ? file.toString() : classpath + "," + file.toString()); + if (addToCache) { + URI uri = fs.makeQualified(file).toUri(); + Job.addCacheFile(uri, conf); + } } /** @@ -1180,7 +1271,23 @@ public void addFileToClassPath(Path file) public void addArchiveToClassPath(Path archive) throws IOException { ensureState(JobState.DEFINE); - DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); + addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); + } + + /** + * Add an archive path to the current set of classpath entries. It adds the + * archive to cache as well. + * + * @param archive Path of the archive to be added. + * @param conf Configuration that contains the classpath setting. + * @param fs FileSystem with respect to which {@code archive} should be interpreted. + */ + public static void addArchiveToClassPath(Path archive, Configuration conf, FileSystem fs) { + String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES); + conf.set(MRJobConfig.CLASSPATH_ARCHIVES, + classpath == null ? archive.toString() : classpath + "," + archive.toString()); + URI uri = fs.makeQualified(archive).toUri(); + Job.addCacheArchive(uri, conf); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index 2825497a42..e16a0d0bb0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -332,13 +332,12 @@ void uploadLibJars(Job job, Collection libjars, Path submitJobDir, // separately. foundFragment = (newURI.getFragment() != null) && !fromSharedCache; } - DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf, - jtFs, false); + Job.addFileToClassPath(new Path(newURI.getPath()), conf, jtFs, false); if (fromSharedCache) { // We simply add this URI to the distributed cache. It will not come // from the staging directory (it is in the shared cache), so we // must add it to the cache regardless of the wildcard feature. - DistributedCache.addCacheFile(newURI, conf); + Job.addCacheFile(newURI, conf); } else { libjarURIs.add(newURI); } @@ -352,10 +351,10 @@ void uploadLibJars(Job job, Collection libjars, Path submitJobDir, // Add the whole directory to the cache using a wild card Path libJarsDirWildcard = jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD)); - DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf); + Job.addCacheFile(libJarsDirWildcard.toUri(), conf); } else { for (URI uri : libjarURIs) { - DistributedCache.addCacheFile(uri, conf); + Job.addCacheFile(uri, conf); } } } @@ -847,8 +846,8 @@ private void copyLog4jPropertyFile(Job job, Path submitJobDir, } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication); - DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), - conf); + Path path = new Path(newPath.toUri().getPath()); + Job.addFileToClassPath(path, conf, path.getFileSystem(conf)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index dd6c922046..4c983178a7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -466,7 +466,7 @@ private static void addMRFrameworkToDistributedCache(Configuration conf) throw new IllegalArgumentException(e); } - DistributedCache.addCacheArchive(uri, conf); + Job.addCacheArchive(uri, conf); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java index ada14db944..5e20bbe40d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.security.Credentials; /** @@ -82,7 +83,7 @@ public static void determineTimestampsAndCacheVisibilities(Configuration job, */ public static void determineTimestamps(Configuration job, Map statCache) throws IOException { - URI[] tarchives = DistributedCache.getCacheArchives(job); + URI[] tarchives = JobContextImpl.getCacheArchives(job); if (tarchives != null) { FileStatus status = getFileStatus(job, tarchives[0], statCache); StringBuilder archiveFileSizes = @@ -100,7 +101,7 @@ public static void determineTimestamps(Configuration job, setArchiveTimestamps(job, archiveTimestamps.toString()); } - URI[] tfiles = DistributedCache.getCacheFiles(job); + URI[] tfiles = JobContextImpl.getCacheFiles(job); if (tfiles != null) { FileStatus status = getFileStatus(job, tfiles[0], statCache); StringBuilder fileSizes = @@ -127,8 +128,8 @@ public static void determineTimestamps(Configuration job, */ public static void getDelegationTokens(Configuration job, Credentials credentials) throws IOException { - URI[] tarchives = DistributedCache.getCacheArchives(job); - URI[] tfiles = DistributedCache.getCacheFiles(job); + URI[] tarchives = JobContextImpl.getCacheArchives(job); + URI[] tfiles = JobContextImpl.getCacheFiles(job); int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0); Path[] ps = new Path[size]; @@ -159,7 +160,7 @@ public static void getDelegationTokens(Configuration job, */ public static void determineCacheVisibilities(Configuration job, Map statCache) throws IOException { - URI[] tarchives = DistributedCache.getCacheArchives(job); + URI[] tarchives = JobContextImpl.getCacheArchives(job); if (tarchives != null) { StringBuilder archiveVisibilities = new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache))); @@ -169,7 +170,7 @@ public static void determineCacheVisibilities(Configuration job, } setArchiveVisibilities(job, archiveVisibilities.toString()); } - URI[] tfiles = DistributedCache.getCacheFiles(job); + URI[] tfiles = JobContextImpl.getCacheFiles(job); if (tfiles != null) { StringBuilder fileVisibilities = new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java index 532e3ad3d7..dd11d490ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; @@ -145,8 +146,7 @@ public class DistributedCache { */ @Deprecated public static void setCacheArchives(URI[] archives, Configuration conf) { - String sarchives = StringUtils.uriToString(archives); - conf.set(MRJobConfig.CACHE_ARCHIVES, sarchives); + Job.setCacheArchives(archives, conf); } /** @@ -159,8 +159,7 @@ public static void setCacheArchives(URI[] archives, Configuration conf) { */ @Deprecated public static void setCacheFiles(URI[] files, Configuration conf) { - String sfiles = StringUtils.uriToString(files); - conf.set(MRJobConfig.CACHE_FILES, sfiles); + Job.setCacheFiles(files, conf); } /** @@ -174,7 +173,7 @@ public static void setCacheFiles(URI[] files, Configuration conf) { */ @Deprecated public static URI[] getCacheArchives(Configuration conf) throws IOException { - return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_ARCHIVES)); + return JobContextImpl.getCacheArchives(conf); } /** @@ -188,7 +187,7 @@ public static URI[] getCacheArchives(Configuration conf) throws IOException { */ @Deprecated public static URI[] getCacheFiles(Configuration conf) throws IOException { - return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_FILES)); + return JobContextImpl.getCacheFiles(conf); } /** @@ -201,10 +200,8 @@ public static URI[] getCacheFiles(Configuration conf) throws IOException { * @see JobContext#getLocalCacheArchives() */ @Deprecated - public static Path[] getLocalCacheArchives(Configuration conf) - throws IOException { - return StringUtils.stringToPath(conf - .getStrings(MRJobConfig.CACHE_LOCALARCHIVES)); + public static Path[] getLocalCacheArchives(Configuration conf) throws IOException { + return JobContextImpl.getLocalCacheArchives(conf); } /** @@ -219,23 +216,7 @@ public static Path[] getLocalCacheArchives(Configuration conf) @Deprecated public static Path[] getLocalCacheFiles(Configuration conf) throws IOException { - return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES)); - } - - /** - * Parse a list of strings into longs. - * @param strs the list of strings to parse - * @return a list of longs that were parsed. same length as strs. - */ - private static long[] parseTimestamps(String[] strs) { - if (strs == null) { - return null; - } - long[] result = new long[strs.length]; - for(int i=0; i < strs.length; ++i) { - result[i] = Long.parseLong(strs[i]); - } - return result; + return JobContextImpl.getLocalCacheFiles(conf); } /** @@ -248,8 +229,7 @@ private static long[] parseTimestamps(String[] strs) { */ @Deprecated public static long[] getArchiveTimestamps(Configuration conf) { - return parseTimestamps( - conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS)); + return JobContextImpl.getArchiveTimestamps(conf); } @@ -263,8 +243,7 @@ public static long[] getArchiveTimestamps(Configuration conf) { */ @Deprecated public static long[] getFileTimestamps(Configuration conf) { - return parseTimestamps( - conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS)); + return JobContextImpl.getFileTimestamps(conf); } /** @@ -277,9 +256,7 @@ public static long[] getFileTimestamps(Configuration conf) { */ @Deprecated public static void addCacheArchive(URI uri, Configuration conf) { - String archives = conf.get(MRJobConfig.CACHE_ARCHIVES); - conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString() - : archives + "," + uri.toString()); + Job.addCacheArchive(uri, conf); } /** @@ -307,9 +284,7 @@ public static void addCacheArchive(URI uri, Configuration conf) { */ @Deprecated public static void addCacheFile(URI uri, Configuration conf) { - String files = conf.get(MRJobConfig.CACHE_FILES); - conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + "," - + uri.toString()); + Job.addCacheFile(uri, conf); } /** @@ -323,9 +298,8 @@ public static void addCacheFile(URI uri, Configuration conf) { * @see Job#addFileToClassPath(Path) */ @Deprecated - public static void addFileToClassPath(Path file, Configuration conf) - throws IOException { - addFileToClassPath(file, conf, file.getFileSystem(conf)); + public static void addFileToClassPath(Path file, Configuration conf) throws IOException { + Job.addFileToClassPath(file, conf, file.getFileSystem(conf)); } /** @@ -340,7 +314,7 @@ public static void addFileToClassPath(Path file, Configuration conf) */ public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) { - addFileToClassPath(file, conf, fs, true); + Job.addFileToClassPath(file, conf, fs, true); } /** @@ -357,14 +331,7 @@ public static void addFileToClassPath(Path file, Configuration conf, */ public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs, boolean addToCache) { - String classpath = conf.get(MRJobConfig.CLASSPATH_FILES); - conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString() - : classpath + "," + file.toString()); - - if (addToCache) { - URI uri = fs.makeQualified(file).toUri(); - addCacheFile(uri, conf); - } + Job.addFileToClassPath(file, conf, fs, addToCache); } /** @@ -377,16 +344,7 @@ public static void addFileToClassPath(Path file, Configuration conf, */ @Deprecated public static Path[] getFileClassPaths(Configuration conf) { - ArrayList list = (ArrayList)conf.getStringCollection( - MRJobConfig.CLASSPATH_FILES); - if (list.size() == 0) { - return null; - } - Path[] paths = new Path[list.size()]; - for (int i = 0; i < list.size(); i++) { - paths[i] = new Path(list.get(i)); - } - return paths; + return JobContextImpl.getFileClassPaths(conf); } /** @@ -401,7 +359,7 @@ public static Path[] getFileClassPaths(Configuration conf) { @Deprecated public static void addArchiveToClassPath(Path archive, Configuration conf) throws IOException { - addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); + Job.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); } /** @@ -415,12 +373,7 @@ public static void addArchiveToClassPath(Path archive, Configuration conf) public static void addArchiveToClassPath (Path archive, Configuration conf, FileSystem fs) throws IOException { - String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES); - conf.set(MRJobConfig.CLASSPATH_ARCHIVES, classpath == null ? archive - .toString() : classpath + "," + archive.toString()); - URI uri = fs.makeQualified(archive).toUri(); - - addCacheArchive(uri, conf); + Job.addArchiveToClassPath(archive, conf, fs); } /** @@ -433,16 +386,7 @@ public static void addArchiveToClassPath(Path archive, Configuration conf) */ @Deprecated public static Path[] getArchiveClassPaths(Configuration conf) { - ArrayList list = (ArrayList)conf.getStringCollection( - MRJobConfig.CLASSPATH_ARCHIVES); - if (list.size() == 0) { - return null; - } - Path[] paths = new Path[list.size()]; - for (int i = 0; i < list.size(); i++) { - paths[i] = new Path(list.get(i)); - } - return paths; + return JobContextImpl.getArchiveClassPaths(conf); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java index 1696246b84..41d03bb002 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; /** * A read-only view of the job that is provided to the tasks while they @@ -305,7 +307,27 @@ public boolean getSymlink() { * Get the archive entries in classpath as an array of Path */ public Path[] getArchiveClassPaths() { - return DistributedCache.getArchiveClassPaths(conf); + return getArchiveClassPaths(conf); + } + + /** + * Get the archive entries in classpath as an array of Path. + * Used by internal DistributedCache code. + * + * @param conf Configuration that contains the classpath setting. + * @return An array of Path consisting of archive entries in classpath. + */ + public static Path[] getArchiveClassPaths(Configuration conf) { + ArrayList list = (ArrayList)conf.getStringCollection( + MRJobConfig.CLASSPATH_ARCHIVES); + if (list.size() == 0) { + return null; + } + Path[] paths = new Path[list.size()]; + for (int i = 0; i < list.size(); i++) { + paths[i] = new Path(list.get(i)); + } + return paths; } /** @@ -314,7 +336,18 @@ public Path[] getArchiveClassPaths() { * @throws IOException */ public URI[] getCacheArchives() throws IOException { - return DistributedCache.getCacheArchives(conf); + return getCacheArchives(conf); + } + + /** + * Get cache archives set in the Configuration. Used by + * internal DistributedCache and JobContextImpl code. + * + * @param conf The configuration which contains the archives. + * @return A URI array of the caches set in the Configuration. + */ + public static URI[] getCacheArchives(Configuration conf) { + return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_ARCHIVES)); } /** @@ -324,7 +357,18 @@ public URI[] getCacheArchives() throws IOException { */ public URI[] getCacheFiles() throws IOException { - return DistributedCache.getCacheFiles(conf); + return getCacheFiles(conf); + } + + /** + * Get cache files set in the Configuration. Used by internal + * DistributedCache and MapReduce code. + * + * @param conf The configuration which contains the files. + * @return A URI array of the files set in the Configuration. + */ + public static URI[] getCacheFiles(Configuration conf) { + return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_FILES)); } /** @@ -334,7 +378,17 @@ public URI[] getCacheFiles() throws IOException { */ public Path[] getLocalCacheArchives() throws IOException { - return DistributedCache.getLocalCacheArchives(conf); + return getLocalCacheArchives(conf); + } + + /** + * Return the path array of the localized caches. + * + * @param conf Configuration that contains the localized archives. + * @return A path array of localized caches. + */ + public static Path[] getLocalCacheArchives(Configuration conf) { + return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALARCHIVES)); } /** @@ -344,14 +398,82 @@ public Path[] getLocalCacheArchives() */ public Path[] getLocalCacheFiles() throws IOException { - return DistributedCache.getLocalCacheFiles(conf); + return getLocalCacheFiles(conf); + } + + /** + * Return the path array of the localized files. + * + * @param conf Configuration that contains the localized files. + * @return A path array of localized files. + */ + public static Path[] getLocalCacheFiles(Configuration conf) { + return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES)); + } + + /** + * Parse a list of strings into longs. + * @param strs the list of strings to parse + * @return a list of longs that were parsed. same length as strs. + */ + private static long[] parseTimestamps(String[] strs) { + if (strs == null) { + return null; + } + long[] result = new long[strs.length]; + for(int i=0; i < strs.length; ++i) { + result[i] = Long.parseLong(strs[i]); + } + return result; + } + + /** + * Get the timestamps of the archives. Used by internal + * DistributedCache and MapReduce code. + * + * @param conf The configuration which stored the timestamps. + * @return a long array of timestamps. + */ + public static long[] getArchiveTimestamps(Configuration conf) { + return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS)); + } + + /** + * Get the timestamps of the files. Used by internal + * DistributedCache and MapReduce code. + * + * @param conf The configuration which stored the timestamps. + * @return a long array of timestamps. + */ + public static long[] getFileTimestamps(Configuration conf) { + return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS)); } /** * Get the file entries in classpath as an array of Path */ public Path[] getFileClassPaths() { - return DistributedCache.getFileClassPaths(conf); + return getFileClassPaths(conf); + } + + /** + * Get the file entries in classpath as an array of Path. + * Used by internal DistributedCache code. + * + * @param conf Configuration that contains the classpath setting. + * @return Array of Path consisting of file entries in the classpath. + */ + public static Path[] getFileClassPaths(Configuration conf) { + ArrayList list = + (ArrayList) conf.getStringCollection(MRJobConfig.CLASSPATH_FILES); + if (list.size() == 0) { + return null; + } + Path[] paths = new Path[list.size()]; + for (int i = 0; i < list.size(); i++) { + paths[i] = new Path(list.get(i)); + } + return paths; } /** @@ -376,7 +498,7 @@ private static String[] toTimestampStrs(long[] timestamps) { * @return a string array of timestamps */ public String[] getArchiveTimestamps() { - return toTimestampStrs(DistributedCache.getArchiveTimestamps(conf)); + return toTimestampStrs(getArchiveTimestamps(conf)); } /** @@ -385,7 +507,7 @@ public String[] getArchiveTimestamps() { * @return a string array of timestamps */ public String[] getFileTimestamps() { - return toTimestampStrs(DistributedCache.getFileTimestamps(conf)); + return toTimestampStrs(getFileTimestamps(conf)); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java index 718dbee014..a71550bce8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java @@ -26,15 +26,8 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.*; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.MRJobConfig; import java.net.URI; @@ -62,8 +55,8 @@ public static class MapClass extends MapReduceBase public void configure(JobConf jconf) { conf = jconf; try { - Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); - Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); + Path[] localArchives = JobContextImpl.getLocalCacheArchives(conf); + Path[] localFiles = JobContextImpl.getLocalCacheFiles(conf); // read the cached files (unzipped, unjarred and text) // and put it into a single file TEST_ROOT_DIR/test.txt String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp"); @@ -254,7 +247,7 @@ public static TestResult launchMRCache(String indir, uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz"); uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz"); uris[5] = fs.getUri().resolve(cacheDir + "/test.tar"); - DistributedCache.addCacheFile(uris[0], conf); + Job.addCacheFile(uris[0], conf); // Save expected file sizes long[] fileSizes = new long[1]; @@ -262,7 +255,7 @@ public static TestResult launchMRCache(String indir, long[] archiveSizes = new long[5]; // track last 5 for (int i = 1; i < 6; i++) { - DistributedCache.addCacheArchive(uris[i], conf); + Job.addCacheArchive(uris[i], conf); archiveSizes[i-1] = // starting with second archive fs.getFileStatus(new Path(uris[i].getPath())).getLen(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java index 0f3916824a..10cb9b85b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java @@ -40,7 +40,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.Job; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -111,7 +111,7 @@ public void testCombinerShouldUpdateTheReporter() throws Exception { conf.setCombinerClass(MyCombinerToCheckReporter.class); //conf.setJarByClass(MyCombinerToCheckReporter.class); conf.setReducerClass(IdentityReducer.class); - DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf); + Job.addFileToClassPath(TestMRJobs.APP_JAR, conf, TestMRJobs.APP_JAR.getFileSystem(conf)); conf.setOutputCommitter(CustomOutputCommitter.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java index 551a5f2dc4..4f9d820c12 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; @@ -969,10 +970,12 @@ protected void setJobConf() throws IOException { fail(LINK_URI); } // set the jobconf for the caching parameters - if (cacheArchives != null) - DistributedCache.setCacheArchives(archiveURIs, jobConf_); - if (cacheFiles != null) - DistributedCache.setCacheFiles(fileURIs, jobConf_); + if (cacheArchives != null) { + Job.setCacheArchives(archiveURIs, jobConf_); + } + if (cacheFiles != null) { + Job.setCacheFiles(fileURIs, jobConf_); + } if (verbose_) { listJobConfProperties();