MAPREDUCE-7118. Distributed cache conflicts breaks backwards compatability. (Jason Lowe via wangda)
Change-Id: I89ab4852b4ad305fec19812e8931c59d96581376
This commit is contained in:
parent
2564884757
commit
b3b4d4ccb5
@ -27,7 +27,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
@ -144,10 +143,9 @@ void createLocalResources(Map<String, LocalResource> localResources)
|
|||||||
|
|
||||||
LocalResource orig = localResources.get(linkName);
|
LocalResource orig = localResources.get(linkName);
|
||||||
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
|
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
|
||||||
throw new InvalidJobConfException(
|
LOG.warn(getResourceDescription(orig.getType()) + orig.getResource()
|
||||||
getResourceDescription(orig.getType()) + orig.getResource()
|
+ " conflicts with " + getResourceDescription(type) + u);
|
||||||
+
|
continue;
|
||||||
" conflicts with " + getResourceDescription(type) + u);
|
|
||||||
}
|
}
|
||||||
Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString());
|
Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString());
|
||||||
sharedCachePolicy =
|
sharedCachePolicy =
|
||||||
|
@ -360,7 +360,7 @@ public void testSetupDistributedCacheEmpty() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Test(timeout = 120000, expected = InvalidJobConfException.class)
|
@Test(timeout = 120000)
|
||||||
public void testSetupDistributedCacheConflicts() throws Exception {
|
public void testSetupDistributedCacheConflicts() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
@ -388,10 +388,18 @@ public void testSetupDistributedCacheConflicts() throws Exception {
|
|||||||
Map<String, LocalResource> localResources =
|
Map<String, LocalResource> localResources =
|
||||||
new HashMap<String, LocalResource>();
|
new HashMap<String, LocalResource>();
|
||||||
MRApps.setupDistributedCache(conf, localResources);
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
|
||||||
|
assertEquals(1, localResources.size());
|
||||||
|
LocalResource lr = localResources.get("something");
|
||||||
|
//Archive wins
|
||||||
|
assertNotNull(lr);
|
||||||
|
assertEquals(10l, lr.getSize());
|
||||||
|
assertEquals(10l, lr.getTimestamp());
|
||||||
|
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Test(timeout = 120000, expected = InvalidJobConfException.class)
|
@Test(timeout = 120000)
|
||||||
public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
@ -416,6 +424,14 @@ public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
|||||||
Map<String, LocalResource> localResources =
|
Map<String, LocalResource> localResources =
|
||||||
new HashMap<String, LocalResource>();
|
new HashMap<String, LocalResource>();
|
||||||
MRApps.setupDistributedCache(conf, localResources);
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
|
||||||
|
assertEquals(1, localResources.size());
|
||||||
|
LocalResource lr = localResources.get("something");
|
||||||
|
//First one wins
|
||||||
|
assertNotNull(lr);
|
||||||
|
assertEquals(10l, lr.getSize());
|
||||||
|
assertEquals(10l, lr.getTimestamp());
|
||||||
|
assertEquals(LocalResourceType.FILE, lr.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
Loading…
Reference in New Issue
Block a user