diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 2a14ec39c1..bcf73d1385 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -37,7 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicLong; +import java.util.UUID; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileUtil; @@ -82,7 +82,7 @@ class LocalDistributedCacheManager { * @param conf * @throws IOException */ - public void setup(JobConf conf) throws IOException { + public void setup(JobConf conf, JobID jobId) throws IOException { File workDir = new File(System.getProperty("user.dir")); // Generate YARN local resources objects corresponding to the distributed @@ -91,9 +91,7 @@ public void setup(JobConf conf) throws IOException { new LinkedHashMap(); MRApps.setupDistributedCache(conf, localResources); // Generating unique numbers for FSDownload. - AtomicLong uniqueNumberGenerator = - new AtomicLong(System.currentTimeMillis()); - + // Find which resources are to be put on the local classpath Map classpaths = new HashMap(); Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf); @@ -124,9 +122,10 @@ public void setup(JobConf conf) throws IOException { Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); Map> resourcesToPaths = Maps.newHashMap(); for (LocalResource resource : localResources.values()) { + Path destPathForDownload = new Path(destPath, + jobId.toString() + "_" + UUID.randomUUID().toString()); Callable download = - new FSDownload(localFSFileContext, ugi, conf, new Path(destPath, - Long.toString(uniqueNumberGenerator.incrementAndGet())), + new FSDownload(localFSFileContext, ugi, conf, destPathForDownload, resource); Future future = exec.submit(download); resourcesToPaths.put(resource, future); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 5e7a2500dc..2ab4e76241 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -169,7 +169,7 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException { // Manage the distributed cache. If there are files to be copied, // this will trigger localFile to be re-written again. localDistributedCacheManager = new LocalDistributedCacheManager(); - localDistributedCacheManager.setup(conf); + localDistributedCacheManager.setup(conf, jobid); // Write out configuration file. Instead of copying it from // systemJobFile, we re-write it, since setup(), above, may have diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java index d2814e962d..fa60e2df21 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -32,6 +32,13 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -54,37 +61,37 @@ @SuppressWarnings("deprecation") public class TestLocalDistributedCacheManager { - private static FileSystem mockfs; - + private static FileSystem mockfs; + public static class MockFileSystem extends FilterFileSystem { public MockFileSystem() { super(mockfs); } } - + private File localDir; - + private static void delete(File file) throws IOException { - if (file.getAbsolutePath().length() < 5) { + if (file.getAbsolutePath().length() < 5) { throw new IllegalArgumentException( "Path [" + file + "] is too short, not deleting"); } - if (file.exists()) { + if (file.exists()) { if (file.isDirectory()) { File[] children = file.listFiles(); if (children != null) { for (File child : children) { delete(child); - } - } - } + } + } + } if (!file.delete()) { throw new RuntimeException( "Could not delete path [" + file + "]"); } } } - + @Before public void setup() throws Exception { mockfs = mock(FileSystem.class); @@ -93,7 +100,7 @@ public void setup() throws Exception { delete(localDir); localDir.mkdirs(); } - + @After public void cleanup() throws Exception { delete(localDir); @@ -120,9 +127,10 @@ public void seek(long position) {} @Test public void testDownload() throws Exception { + JobID jobId = new JobID(); JobConf conf = new JobConf(); conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); - + URI mockBase = new URI("mock://test-nn1/"); when(mockfs.getUri()).thenReturn(mockBase); Path working = new Path("mock://test-nn1/user/me/"); @@ -137,14 +145,14 @@ public Path answer(InvocationOnMock args) throws Throwable { final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); final Path filePath = new Path(file); File link = new File("link"); - + when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { @Override public FileStatus answer(InvocationOnMock args) throws Throwable { Path p = (Path)args.getArguments()[0]; if("file.txt".equals(p.getName())) { - return new FileStatus(201, false, 1, 500, 101, 101, - FsPermission.getDefault(), "me", "me", filePath); + return new FileStatus(201, false, 1, 500, 101, 101, + FsPermission.getDefault(), "me", "me", filePath); } else { throw new FileNotFoundException(p+" not supported by mocking"); } @@ -176,7 +184,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); try { - manager.setup(conf); + manager.setup(conf, jobId); assertTrue(link.exists()); } finally { manager.close(); @@ -186,9 +194,10 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { @Test public void testEmptyDownload() throws Exception { + JobID jobId = new JobID(); JobConf conf = new JobConf(); conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); - + URI mockBase = new URI("mock://test-nn1/"); when(mockfs.getUri()).thenReturn(mockBase); Path working = new Path("mock://test-nn1/user/me/"); @@ -199,7 +208,7 @@ public Path answer(InvocationOnMock args) throws Throwable { return (Path) args.getArguments()[0]; } }); - + when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { @Override public FileStatus answer(InvocationOnMock args) throws Throwable { @@ -221,7 +230,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); try { - manager.setup(conf); + manager.setup(conf, jobId); } finally { manager.close(); } @@ -230,9 +239,10 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { @Test public void testDuplicateDownload() throws Exception { + JobID jobId = new JobID(); JobConf conf = new JobConf(); conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); - + URI mockBase = new URI("mock://test-nn1/"); when(mockfs.getUri()).thenReturn(mockBase); Path working = new Path("mock://test-nn1/user/me/"); @@ -247,14 +257,14 @@ public Path answer(InvocationOnMock args) throws Throwable { final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); final Path filePath = new Path(file); File link = new File("link"); - + when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { @Override public FileStatus answer(InvocationOnMock args) throws Throwable { Path p = (Path)args.getArguments()[0]; if("file.txt".equals(p.getName())) { - return new FileStatus(201, false, 1, 500, 101, 101, - FsPermission.getDefault(), "me", "me", filePath); + return new FileStatus(201, false, 1, 500, 101, 101, + FsPermission.getDefault(), "me", "me", filePath); } else { throw new FileNotFoundException(p+" not supported by mocking"); } @@ -287,11 +297,48 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable { conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); try { - manager.setup(conf); + manager.setup(conf, jobId); assertTrue(link.exists()); } finally { manager.close(); } assertFalse(link.exists()); } + + /** + * This test tries to replicate the issue with the previous version of + * {@ref LocalDistributedCacheManager} when the resulting timestamp is + * identical as that in another process. Unfortunately, it is difficult + * to mimic such behavior in a single process unit test. And mocking + * the unique id (timestamp previously, UUID otherwise) won't prove the + * validity of one approach over the other. + */ + @Test + public void testMultipleCacheSetup() throws Exception { + JobID jobId = new JobID(); + JobConf conf = new JobConf(); + LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); + + final int threadCount = 10; + final CyclicBarrier barrier = new CyclicBarrier(threadCount); + + ArrayList> setupCallable = new ArrayList<>(); + for (int i = 0; i < threadCount; ++i) { + setupCallable.add(() -> { + barrier.await(); + manager.setup(conf, jobId); + return null; + }); + } + + ExecutorService ePool = Executors.newFixedThreadPool(threadCount); + try { + for (Future future : ePool.invokeAll(setupCallable)) { + future.get(); + } + } finally { + ePool.shutdown(); + manager.close(); + } + } }