MAPREDUCE-6441. Improve temporary directory name generation in LocalDistributedCacheManager for concurrent processes (wattsinabox, rchiang, haibochen via rkanter)
This commit is contained in:
parent
fcea5a4d79
commit
edb202e493
@ -37,7 +37,7 @@
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
@ -82,7 +82,7 @@ class LocalDistributedCacheManager {
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setup(JobConf conf) throws IOException {
|
||||
public void setup(JobConf conf, JobID jobId) throws IOException {
|
||||
File workDir = new File(System.getProperty("user.dir"));
|
||||
|
||||
// Generate YARN local resources objects corresponding to the distributed
|
||||
@ -91,8 +91,6 @@ public void setup(JobConf conf) throws IOException {
|
||||
new LinkedHashMap<String, LocalResource>();
|
||||
MRApps.setupDistributedCache(conf, localResources);
|
||||
// Generating unique numbers for FSDownload.
|
||||
AtomicLong uniqueNumberGenerator =
|
||||
new AtomicLong(System.currentTimeMillis());
|
||||
|
||||
// Find which resources are to be put on the local classpath
|
||||
Map<String, Path> classpaths = new HashMap<String, Path>();
|
||||
@ -124,9 +122,10 @@ public void setup(JobConf conf) throws IOException {
|
||||
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
||||
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
||||
for (LocalResource resource : localResources.values()) {
|
||||
Path destPathForDownload = new Path(destPath,
|
||||
jobId.toString() + "_" + UUID.randomUUID().toString());
|
||||
Callable<Path> download =
|
||||
new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
|
||||
Long.toString(uniqueNumberGenerator.incrementAndGet())),
|
||||
new FSDownload(localFSFileContext, ugi, conf, destPathForDownload,
|
||||
resource);
|
||||
Future<Path> future = exec.submit(download);
|
||||
resourcesToPaths.put(resource, future);
|
||||
|
@ -169,7 +169,7 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException {
|
||||
// Manage the distributed cache. If there are files to be copied,
|
||||
// this will trigger localFile to be re-written again.
|
||||
localDistributedCacheManager = new LocalDistributedCacheManager();
|
||||
localDistributedCacheManager.setup(conf);
|
||||
localDistributedCacheManager.setup(conf, jobid);
|
||||
|
||||
// Write out configuration file. Instead of copying it from
|
||||
// systemJobFile, we re-write it, since setup(), above, may have
|
||||
|
@ -32,6 +32,13 @@
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -120,6 +127,7 @@ public void seek(long position) {}
|
||||
|
||||
@Test
|
||||
public void testDownload() throws Exception {
|
||||
JobID jobId = new JobID();
|
||||
JobConf conf = new JobConf();
|
||||
conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
|
||||
|
||||
@ -176,7 +184,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
||||
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
try {
|
||||
manager.setup(conf);
|
||||
manager.setup(conf, jobId);
|
||||
assertTrue(link.exists());
|
||||
} finally {
|
||||
manager.close();
|
||||
@ -186,6 +194,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
||||
|
||||
@Test
|
||||
public void testEmptyDownload() throws Exception {
|
||||
JobID jobId = new JobID();
|
||||
JobConf conf = new JobConf();
|
||||
conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
|
||||
|
||||
@ -221,7 +230,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
||||
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
try {
|
||||
manager.setup(conf);
|
||||
manager.setup(conf, jobId);
|
||||
} finally {
|
||||
manager.close();
|
||||
}
|
||||
@ -230,6 +239,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
||||
|
||||
@Test
|
||||
public void testDuplicateDownload() throws Exception {
|
||||
JobID jobId = new JobID();
|
||||
JobConf conf = new JobConf();
|
||||
conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
|
||||
|
||||
@ -287,11 +297,48 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
|
||||
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
try {
|
||||
manager.setup(conf);
|
||||
manager.setup(conf, jobId);
|
||||
assertTrue(link.exists());
|
||||
} finally {
|
||||
manager.close();
|
||||
}
|
||||
assertFalse(link.exists());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test tries to replicate the issue with the previous version of
|
||||
* {@ref LocalDistributedCacheManager} when the resulting timestamp is
|
||||
* identical as that in another process. Unfortunately, it is difficult
|
||||
* to mimic such behavior in a single process unit test. And mocking
|
||||
* the unique id (timestamp previously, UUID otherwise) won't prove the
|
||||
* validity of one approach over the other.
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleCacheSetup() throws Exception {
|
||||
JobID jobId = new JobID();
|
||||
JobConf conf = new JobConf();
|
||||
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
|
||||
|
||||
final int threadCount = 10;
|
||||
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
|
||||
|
||||
ArrayList<Callable<Void>> setupCallable = new ArrayList<>();
|
||||
for (int i = 0; i < threadCount; ++i) {
|
||||
setupCallable.add(() -> {
|
||||
barrier.await();
|
||||
manager.setup(conf, jobId);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
ExecutorService ePool = Executors.newFixedThreadPool(threadCount);
|
||||
try {
|
||||
for (Future<Void> future : ePool.invokeAll(setupCallable)) {
|
||||
future.get();
|
||||
}
|
||||
} finally {
|
||||
ePool.shutdown();
|
||||
manager.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user