MAPREDUCE-2409. DistributedCache maps files and archives to the same path,

despite semantic incompatibility. Contributed by Siddharth Seth


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1149004 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2011-07-21 01:59:14 +00:00
parent 08928d067b
commit 3810266eae
4 changed files with 72 additions and 25 deletions

View File

@ -344,6 +344,9 @@ Trunk (unreleased changes)
MAPREDUCE-2710. Update JobSubmitter.printTokens(..) for HDFS-2161. MAPREDUCE-2710. Update JobSubmitter.printTokens(..) for HDFS-2161.
(szetszwo) (szetszwo)
MAPREDUCE-2409. DistributedCache maps files and archives to the same path,
despite semantic incompatibility. (Siddharth Seth via cdouglas)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -241,7 +241,7 @@ public void release() throws IOException {
for (CacheFile c : cacheFiles) { for (CacheFile c : cacheFiles) {
if (c.getLocalized()) { if (c.getLocalized()) {
distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp, distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
c.owner); c.owner, CacheFile.FileType.ARCHIVE == c.type);
} }
} }
} }

View File

@ -158,9 +158,9 @@ Path getLocalCache(URI cache, Configuration conf,
Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic) Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
throws IOException { throws IOException {
String key; String key;
key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic)); key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic),
isArchive);
CacheStatus lcacheStatus; CacheStatus lcacheStatus;
Path localizedPath = null;
synchronized (cachedArchives) { synchronized (cachedArchives) {
lcacheStatus = cachedArchives.get(key); lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) { if (lcacheStatus == null) {
@ -187,18 +187,18 @@ Path getLocalCache(URI cache, Configuration conf,
FileSystem fs = FileSystem.get(cache, conf); FileSystem fs = FileSystem.get(cache, conf);
checkStampSinceJobStarted(conf, fs, cache, confFileStamp, checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
lcacheStatus, fileStatus); lcacheStatus, fileStatus);
localizedPath = localizeCache(conf, cache, confFileStamp, localizeCache(conf, cache, confFileStamp,
lcacheStatus, isArchive, isPublic); lcacheStatus, isArchive, isPublic);
lcacheStatus.initComplete(); lcacheStatus.initComplete();
} else { } else {
localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp, checkCacheStatusValidity(conf, cache, confFileStamp,
lcacheStatus, fileStatus, isArchive); lcacheStatus, fileStatus, isArchive);
} }
createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir, createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
honorSymLinkConf); honorSymLinkConf);
} }
initSuccessful = true; initSuccessful = true;
return localizedPath; return lcacheStatus.localizedLoadPath;
} finally { } finally {
if (!initSuccessful) { if (!initSuccessful) {
lcacheStatus.decRefCount(); lcacheStatus.decRefCount();
@ -217,8 +217,8 @@ Path getLocalCache(URI cache, Configuration conf,
* @throws IOException * @throws IOException
*/ */
void releaseCache(URI cache, Configuration conf, long timeStamp, void releaseCache(URI cache, Configuration conf, long timeStamp,
String owner) throws IOException { String owner, boolean isArchive) throws IOException {
String key = getKey(cache, conf, timeStamp, owner); String key = getKey(cache, conf, timeStamp, owner, isArchive);
synchronized (cachedArchives) { synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(key); CacheStatus lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) { if (lcacheStatus == null) {
@ -236,8 +236,8 @@ void releaseCache(URI cache, Configuration conf, long timeStamp,
* This method is called from unit tests. * This method is called from unit tests.
*/ */
int getReferenceCount(URI cache, Configuration conf, long timeStamp, int getReferenceCount(URI cache, Configuration conf, long timeStamp,
String owner) throws IOException { String owner, boolean isArchive) throws IOException {
String key = getKey(cache, conf, timeStamp, owner); String key = getKey(cache, conf, timeStamp, owner, isArchive);
synchronized (cachedArchives) { synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(key); CacheStatus lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) { if (lcacheStatus == null) {
@ -315,9 +315,10 @@ String makeRelative(URI cache, Configuration conf)
return path; return path;
} }
String getKey(URI cache, Configuration conf, long timeStamp, String user) String getKey(URI cache, Configuration conf, long timeStamp, String user,
throws IOException { boolean isArchive) throws IOException {
return makeRelative(cache, conf) + String.valueOf(timeStamp) + user; return (isArchive ? "a" : "f") + "^" + makeRelative(cache, conf)
+ String.valueOf(timeStamp) + user;
} }
/** /**
@ -341,12 +342,11 @@ static FileStatus getFileStatus(Configuration conf, URI cache)
* @return mtime of a given cache file on hdfs * @return mtime of a given cache file on hdfs
* @throws IOException * @throws IOException
*/ */
static long getTimestamp(Configuration conf, URI cache) long getTimestamp(Configuration conf, URI cache) throws IOException {
throws IOException {
return getFileStatus(conf, cache).getModificationTime(); return getFileStatus(conf, cache).getModificationTime();
} }
private Path checkCacheStatusValidity(Configuration conf, void checkCacheStatusValidity(Configuration conf,
URI cache, long confFileStamp, URI cache, long confFileStamp,
CacheStatus cacheStatus, CacheStatus cacheStatus,
FileStatus fileStatus, FileStatus fileStatus,
@ -362,7 +362,6 @@ private Path checkCacheStatusValidity(Configuration conf,
LOG.info(String.format("Using existing cache of %s->%s", LOG.info(String.format("Using existing cache of %s->%s",
cache.toString(), cacheStatus.localizedLoadPath)); cache.toString(), cacheStatus.localizedLoadPath));
return cacheStatus.localizedLoadPath;
} }
private void createSymlink(Configuration conf, URI cache, private void createSymlink(Configuration conf, URI cache,
@ -472,7 +471,7 @@ private static boolean isTarFile(String filename) {
} }
// ensure that the file on hdfs hasn't been modified since the job started // ensure that the file on hdfs hasn't been modified since the job started
private long checkStampSinceJobStarted(Configuration conf, FileSystem fs, long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
URI cache, long confFileStamp, URI cache, long confFileStamp,
CacheStatus lcacheStatus, CacheStatus lcacheStatus,
FileStatus fileStatus) FileStatus fileStatus)

View File

@ -51,11 +51,15 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.mortbay.log.Log; import org.mortbay.log.Log;
import org.mockito.Matchers;
import static org.mockito.Mockito.*;
public class TestTrackerDistributedCacheManager extends TestCase { public class TestTrackerDistributedCacheManager extends TestCase {
protected String TEST_ROOT_DIR = protected String TEST_ROOT_DIR =
@ -251,7 +255,7 @@ public void testReferenceCount() throws IOException, LoginException,
handle.release(); handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) { for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp, assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
c.owner)); c.owner, false));
} }
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile"); Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@ -289,7 +293,7 @@ public void testReferenceCount() throws IOException, LoginException,
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) { for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
try { try {
assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp, assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
c.owner)); c.owner, false));
} catch (IOException ie) { } catch (IOException ie) {
th = ie; th = ie;
Log.info("Exception getting reference count for " + c.uri, ie); Log.info("Exception getting reference count for " + c.uri, ie);
@ -609,15 +613,15 @@ public void testLRUDeleteCache() throws Exception {
manager.releaseCache(thirdCacheFile.toUri(), conf2, manager.releaseCache(thirdCacheFile.toUri(), conf2,
getFileStamp(thirdCacheFile), getFileStamp(thirdCacheFile),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
manager.releaseCache(secondCacheFile.toUri(), conf2, manager.releaseCache(secondCacheFile.toUri(), conf2,
getFileStamp(secondCacheFile), getFileStamp(secondCacheFile),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
manager.releaseCache(firstCacheFile.toUri(), conf2, manager.releaseCache(firstCacheFile.toUri(), conf2,
getFileStamp(firstCacheFile), getFileStamp(firstCacheFile),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
// Getting the fourth cache will make the number of sub directories becomes // Getting the fourth cache will make the number of sub directories becomes
@ -645,6 +649,47 @@ public void testLRUDeleteCache() throws Exception {
manager.stopCleanupThread(); manager.stopCleanupThread();
} }
public void testSameNameFileArchiveCache() throws IOException,
URISyntaxException, InterruptedException {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager =
spy(new TrackerDistributedCacheManager(conf, taskController));
URI rsrc = new URI("file://foo/bar/yak");
Path cacheDir = new Path("file:///localcache");
Path archivePath = new Path(cacheDir, "archive");
Path filePath = new Path(cacheDir, "file");
doReturn(archivePath).when(manager).localizeCache(eq(conf), eq(rsrc),
anyLong(), Matchers.<CacheStatus> anyObject(), eq(true), anyBoolean());
doReturn(filePath).when(manager).localizeCache(eq(conf), eq(rsrc),
anyLong(), Matchers.<CacheStatus> anyObject(), eq(false), anyBoolean());
// could fail, but check match instead
doNothing().when(manager).checkCacheStatusValidity(
Matchers.<Configuration> anyObject(), eq(rsrc), anyLong(),
Matchers.<CacheStatus> anyObject(), Matchers.<FileStatus> anyObject(),
anyBoolean());
// localizeCache initializes mtime of cached rsrc; set to uninitialized val
doReturn(-1L).when(manager).checkStampSinceJobStarted(
Matchers.<Configuration> anyObject(),
Matchers.<FileSystem> anyObject(), eq(rsrc), anyLong(),
Matchers.<CacheStatus> anyObject(), Matchers.<FileStatus> anyObject());
doReturn(-1L).when(manager).getTimestamp(
Matchers.<Configuration> anyObject(), eq(rsrc));
FileStatus rsrcStatus = mock(FileStatus.class);
when(rsrcStatus.getLen()).thenReturn(4344L);
Path localizedPathForFile =
manager.getLocalCache(rsrc, conf, "sub", rsrcStatus, false, 20L,
new Path("file:///tmp"), false, true);
Path localizedPathForArchive =
manager.getLocalCache(rsrc, conf, "sub", rsrcStatus, true, 20L,
new Path("file:///tmp"), false, true);
assertNotSame("File and Archive resolve to the same path: "
+ localizedPathForFile + ". Should differ.", localizedPathForFile,
localizedPathForArchive);
}
/** test delete cache */ /** test delete cache */
public void testDeleteCache() throws Exception { public void testDeleteCache() throws Exception {
if (!canRun()) { if (!canRun()) {
@ -676,7 +721,7 @@ public void testDeleteCache() throws Exception {
getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
manager.releaseCache(firstCacheFile.toUri(), conf2, manager.releaseCache(firstCacheFile.toUri(), conf2,
getFileStamp(firstCacheFile), getFileStamp(firstCacheFile),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
//in above code,localized a file of size 4K and then release the cache //in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out. // which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to // The below code localize another cache which's designed to
@ -702,7 +747,7 @@ public void testDeleteCache() throws Exception {
// Release the third cache so that it can be deleted while sweeping // Release the third cache so that it can be deleted while sweeping
manager.releaseCache(thirdCacheFile.toUri(), conf2, manager.releaseCache(thirdCacheFile.toUri(), conf2,
getFileStamp(thirdCacheFile), getFileStamp(thirdCacheFile),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false)); TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
// Getting the fourth cache will make the number of sub directories becomes // Getting the fourth cache will make the number of sub directories becomes
// 3 which is greater than 2. So the released cache will be deleted. // 3 which is greater than 2. So the released cache will be deleted.
manager.getLocalCache(fourthCacheFile.toUri(), conf2, manager.getLocalCache(fourthCacheFile.toUri(), conf2,