YARN-706. Fixed race conditions in TestFSDownload. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1485028 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6377bfc505
commit
c665b0e86b
@ -399,6 +399,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||
ApplicationReportProto and fix bugs in ApplicationReportPBImpl. (Zhijie Shen
|
||||
via vinodkv)
|
||||
|
||||
YARN-706. Fixed race conditions in TestFSDownload. (Zhijie Shen via vinodkv).
|
||||
|
||||
Release 2.0.4-alpha - 2013-04-25
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarOutputStream;
|
||||
@ -238,6 +239,9 @@ public class TestFSDownload {
|
||||
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||
destPath, rsrc);
|
||||
pending.put(rsrc, exec.submit(fsd));
|
||||
exec.shutdown();
|
||||
while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(pending.get(rsrc).isDone());
|
||||
|
||||
try {
|
||||
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
||||
@ -246,8 +250,6 @@ public class TestFSDownload {
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof IOException);
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@ -295,6 +297,12 @@ public class TestFSDownload {
|
||||
pending.put(rsrc, exec.submit(fsd));
|
||||
}
|
||||
|
||||
exec.shutdown();
|
||||
while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
for (Future<Path> path: pending.values()) {
|
||||
Assert.assertTrue(path.isDone());
|
||||
}
|
||||
|
||||
try {
|
||||
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
||||
Path localized = p.getValue().get();
|
||||
@ -316,12 +324,9 @@ public class TestFSDownload {
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException("Failed exec", e);
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test (timeout=10000)
|
||||
public void testDownloadArchive() throws IOException, URISyntaxException,
|
||||
InterruptedException {
|
||||
@ -354,12 +359,15 @@ public class TestFSDownload {
|
||||
FSDownload fsd = new FSDownload(files,
|
||||
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
|
||||
pending.put(rsrc, exec.submit(fsd));
|
||||
exec.shutdown();
|
||||
while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(pending.get(rsrc).isDone());
|
||||
|
||||
try {
|
||||
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
||||
basedir);
|
||||
for (FileStatus filestatus : filesstatus) {
|
||||
if (filestatus.isDir()) {
|
||||
if (filestatus.isDirectory()) {
|
||||
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
||||
filestatus.getPath());
|
||||
for (FileStatus childfile : childFiles) {
|
||||
@ -373,12 +381,8 @@ public class TestFSDownload {
|
||||
}catch (Exception e) {
|
||||
throw new IOException("Failed exec", e);
|
||||
}
|
||||
finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
|
||||
@Test (timeout=10000)
|
||||
public void testDownloadPatternJar() throws IOException, URISyntaxException,
|
||||
InterruptedException {
|
||||
@ -412,12 +416,15 @@ public class TestFSDownload {
|
||||
FSDownload fsdjar = new FSDownload(files,
|
||||
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
|
||||
pending.put(rsrcjar, exec.submit(fsdjar));
|
||||
exec.shutdown();
|
||||
while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(pending.get(rsrcjar).isDone());
|
||||
|
||||
try {
|
||||
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
||||
basedir);
|
||||
for (FileStatus filestatus : filesstatus) {
|
||||
if (filestatus.isDir()) {
|
||||
if (filestatus.isDirectory()) {
|
||||
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
||||
filestatus.getPath());
|
||||
for (FileStatus childfile : childFiles) {
|
||||
@ -431,12 +438,8 @@ public class TestFSDownload {
|
||||
}catch (Exception e) {
|
||||
throw new IOException("Failed exec", e);
|
||||
}
|
||||
finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test (timeout=10000)
|
||||
public void testDownloadArchiveZip() throws IOException, URISyntaxException,
|
||||
InterruptedException {
|
||||
@ -469,12 +472,15 @@ public class TestFSDownload {
|
||||
FSDownload fsdzip = new FSDownload(files,
|
||||
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
|
||||
pending.put(rsrczip, exec.submit(fsdzip));
|
||||
exec.shutdown();
|
||||
while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(pending.get(rsrczip).isDone());
|
||||
|
||||
try {
|
||||
FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
|
||||
basedir);
|
||||
for (FileStatus filestatus : filesstatus) {
|
||||
if (filestatus.isDir()) {
|
||||
if (filestatus.isDirectory()) {
|
||||
FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
|
||||
filestatus.getPath());
|
||||
for (FileStatus childfile : childFiles) {
|
||||
@ -488,9 +494,6 @@ public class TestFSDownload {
|
||||
}catch (Exception e) {
|
||||
throw new IOException("Failed exec", e);
|
||||
}
|
||||
finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyPermsRecursively(FileSystem fs,
|
||||
@ -565,7 +568,13 @@ public class TestFSDownload {
|
||||
destPath, rsrc);
|
||||
pending.put(rsrc, exec.submit(fsd));
|
||||
}
|
||||
|
||||
|
||||
exec.shutdown();
|
||||
while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
for (Future<Path> path: pending.values()) {
|
||||
Assert.assertTrue(path.isDone());
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
|
||||
@ -581,15 +590,10 @@ public class TestFSDownload {
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException("Failed exec", e);
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
@Test (timeout=10000)
|
||||
public void testUniqueDestinationPath() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||
@ -606,20 +610,20 @@ public class TestFSDownload {
|
||||
destPath =
|
||||
new Path(destPath, Long.toString(uniqueNumberGenerator
|
||||
.incrementAndGet()));
|
||||
try {
|
||||
Path p = new Path(basedir, "dir" + 0 + ".jar");
|
||||
LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
||||
LocalResource rsrc = createJar(files, p, vis);
|
||||
FSDownload fsd =
|
||||
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||
destPath, rsrc);
|
||||
Future<Path> rPath = singleThreadedExec.submit(fsd);
|
||||
// Now FSDownload will not create a random directory to localize the
|
||||
// resource. Therefore the final localizedPath for the resource should be
|
||||
// destination directory (passed as an argument) + file name.
|
||||
Assert.assertEquals(destPath, rPath.get().getParent());
|
||||
} finally {
|
||||
singleThreadedExec.shutdown();
|
||||
}
|
||||
|
||||
Path p = new Path(basedir, "dir" + 0 + ".jar");
|
||||
LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
||||
LocalResource rsrc = createJar(files, p, vis);
|
||||
FSDownload fsd =
|
||||
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||
destPath, rsrc);
|
||||
Future<Path> rPath = singleThreadedExec.submit(fsd);
|
||||
singleThreadedExec.shutdown();
|
||||
while (!singleThreadedExec.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(rPath.isDone());
|
||||
// Now FSDownload will not create a random directory to localize the
|
||||
// resource. Therefore the final localizedPath for the resource should be
|
||||
// destination directory (passed as an argument) + file name.
|
||||
Assert.assertEquals(destPath, rPath.get().getParent());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user