diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 620ccb84a3..8ca1299e6b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -156,6 +156,9 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-4465. Update description of yarn.nodemanager.address property. (bowang via tucu) + MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files + get deleted from tasktracker. (mayank_bansal via tucu) + Release 2.1.0-alpha - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 7127db97c0..01ec38397b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import java.io.File; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; /** * A collection of {@link LocalizedResource}s all of same @@ -67,6 +69,12 @@ public void handle(ResourceEvent event) { switch (event.getType()) { case REQUEST: case LOCALIZED: + if (rsrc != null && (!isResourcePresent(rsrc))) { + LOG.info("Resource " + rsrc.getLocalPath() + + " is missing, localizing it again"); + localrsrc.remove(req); + rsrc = null; + } if (null == rsrc) { rsrc = new LocalizedResource(req, dispatcher); localrsrc.put(req, rsrc); @@ -82,6 +90,24 @@ public void handle(ResourceEvent event) { rsrc.handle(event); } + /** + * This module checks if the resource which was localized is already present + * or not + * + * @param rsrc + * @return true/false based on resource is present or not + */ + public boolean isResourcePresent(LocalizedResource rsrc) { + boolean ret = true; + if (rsrc.getState() == ResourceState.LOCALIZED) { + File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString()); + if (!file.exists()) { + ret = false; + } + } + return ret; + } + @Override public boolean contains(LocalResourceRequest resource) { return localrsrc.containsKey(resource); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 7f0e353697..3ee623ceca 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -5,6 +5,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.File; +import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; +import org.mortbay.log.Log; public class TestLocalResourcesTrackerImpl { @@ -131,6 +134,86 @@ public void test() { } } + @Test + @SuppressWarnings("unchecked") + public void testConsistency() { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + dispatcher = createDispatcher(new Configuration()); + EventHandler localizerEventHandler = mock(EventHandler.class); + EventHandler containerEventHandler = mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + ConcurrentMap localrsrc = new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + dispatcher, localrsrc); + + ResourceEvent req11Event = new ResourceRequestEvent(req1, + LocalResourceVisibility.PUBLIC, lc1); + + ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1); + + // Localize R1 for C1 + tracker.handle(req11Event); + + dispatcher.await(); + + // Verify refCount for R1 is 1 + Assert.assertEquals(1, lr1.getRefCount()); + + dispatcher.await(); + verifyTrackedResourceCount(tracker, 1); + + // Localize resource1 + ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path( + "file:///tmp/r1"), 1); + lr1.handle(rle); + Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED)); + Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1"))); + LocalizedResource rsrcbefore = tracker.iterator().next(); + File resFile = new File(lr1.getLocalPath().toUri().getRawPath() + .toString()); + Assert.assertTrue(resFile.exists()); + Assert.assertTrue(resFile.delete()); + + // Localize R1 for C1 + tracker.handle(req11Event); + + dispatcher.await(); + lr1.handle(rle); + Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED)); + LocalizedResource rsrcafter = tracker.iterator().next(); + if (rsrcbefore == rsrcafter) { + Assert.fail("Localized resource should not be equal"); + } + // Release resource1 + tracker.handle(rel11Event); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + + private boolean createdummylocalizefile(Path path) { + boolean ret = false; + File file = new File(path.toUri().getRawPath().toString()); + try { + ret = file.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + return ret; + } + private void verifyTrackedResourceCount(LocalResourcesTracker tracker, int expected) { int count = 0;