From 4234bc87b3e0bf7e9716d6ca1873b8bb0239472e Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 11 Apr 2013 02:08:11 +0000 Subject: [PATCH] YARN-539. Addressed memory leak of LocalResource objects NM when a resource localization fails. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466756 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 7 + .../localizer/LocalResourcesTracker.java | 3 - .../localizer/LocalResourcesTrackerImpl.java | 42 +++-- .../localizer/LocalizedResource.java | 35 +++- .../ResourceLocalizationService.java | 80 +++------ .../localizer/ResourceState.java | 3 +- .../localizer/event/ResourceEventType.java | 4 +- .../ResourceFailedLocalizationEvent.java | 39 +++++ .../TestLocalResourcesTrackerImpl.java | 161 +++++++++++++++++- 10 files changed, 291 insertions(+), 86 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cef0814800..a7ce38e6c3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -220,6 +220,9 @@ Release 2.0.5-beta - UNRELEASED YARN-534. Change RM restart recovery to also account for AM max-attempts configuration after the restart. (Jian He via vinodkv) + YARN-539. Addressed memory leak of LocalResource objects NM when a resource + localization fails. (Omkar Vinit Joshi via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 247406434e..4ba2d72289 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -270,4 +270,11 @@ + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 2e795e54a1..98ec471abf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -40,8 +40,5 @@ interface LocalResourcesTracker String getUser(); - // TODO: Remove this in favour of EventHandler.handle - void localizationCompleted(LocalResourceRequest req, boolean success); - long nextUniqueNumber(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 53ca9013da..786b58ca5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; /** @@ -96,13 +97,22 @@ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, this.conf = conf; } + /* + * Synchronizing this method for avoiding races due to multiple ResourceEvent's + * coming to LocalResourcesTracker from Public/Private localizer and + * Resource Localization Service. + */ @Override - public void handle(ResourceEvent event) { + public synchronized void handle(ResourceEvent event) { LocalResourceRequest req = event.getLocalResourceRequest(); LocalizedResource rsrc = localrsrc.get(req); switch (event.getType()) { - case REQUEST: case LOCALIZED: + if (useLocalCacheDirectoryManager) { + inProgressLocalResourcesMap.remove(req); + } + break; + case REQUEST: if (rsrc != null && (!isResourcePresent(rsrc))) { LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); @@ -117,10 +127,24 @@ public void handle(ResourceEvent event) { break; case RELEASE: if (null == rsrc) { - LOG.info("Release unknown rsrc null (discard)"); + // The container sent a release event on a resource which + // 1) Failed + // 2) Removed for some reason (ex. disk is no longer accessible) + ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; + LOG.info("Container " + relEvent.getContainer() + + " sent RELEASE event on a resource request " + req + + " not present in cache."); return; } break; + case LOCALIZATION_FAILED: + decrementFileCountForLocalCacheDirectory(req, null); + /* + * If resource localization fails then Localized resource will be + * removed from local cache. + */ + localrsrc.remove(req); + break; } rsrc.handle(event); } @@ -279,18 +303,6 @@ public Iterator iterator() { } } - @Override - public void localizationCompleted(LocalResourceRequest req, - boolean success) { - if (useLocalCacheDirectoryManager) { - if (!success) { - decrementFileCountForLocalCacheDirectory(req, null); - } else { - inProgressLocalResourcesMap.remove(req); - } - } - } - @Override public long nextUniqueNumber() { return uniqueNumberGenerator.incrementAndGet(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index 00709fd91c..f0cd87b573 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -32,10 +32,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; @@ -89,6 +91,8 @@ ResourceEventType.LOCALIZED, new FetchSuccessTransition()) .addTransition(ResourceState.DOWNLOADING, EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), ResourceEventType.RELEASE, new ReleasePendingTransition()) + .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, + ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) // From LOCALIZED (ref >= 0, on disk) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, @@ -126,12 +130,14 @@ public String toString() { } private void release(ContainerId container) { - if (!ref.remove(container)) { - LOG.info("Attempt to release claim on " + this + - " from unregistered container " + container); - assert false; // TODO: FIX + if (ref.remove(container)) { + // updating the timestamp only in case of success. + timestamp.set(currentTime()); + } else { + LOG.info("Container " + container + + " doesn't exist in the container list of the Resource " + this + + " to which it sent RELEASE event"); } - timestamp.set(currentTime()); } private long currentTime() { @@ -250,6 +256,25 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { } } + /** + * Resource localization failed, notify waiting containers. + */ + @SuppressWarnings("unchecked") + private static class FetchFailedTransition extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + ResourceFailedLocalizationEvent failedEvent = + (ResourceFailedLocalizationEvent) event; + Queue containers = rsrc.ref; + Throwable failureCause = failedEvent.getCause(); + for (ContainerId container : containers) { + rsrc.dispatcher.getEventHandler().handle( + new ContainerResourceFailedEvent(container, failedEvent + .getLocalResourceRequest(), failureCause)); + } + } + } + /** * Resource already localized, notify immediately. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 5058cb2cad..7b9873a1f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; -import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; @@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; @@ -683,7 +683,6 @@ public void addResource(LocalizerResourceRequestEvent request) { } @Override - @SuppressWarnings("unchecked") // dispatcher not typed public void run() { try { // TODO shutdown, better error handling esp. DU @@ -699,10 +698,8 @@ public void run() { return; } LocalResourceRequest key = assoc.getResource().getRequest(); - assoc.getResource().handle( - new ResourceLocalizedEvent(key, - local, FileUtil.getDU(new File(local.toUri())))); - publicRsrc.localizationCompleted(key, true); + publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil + .getDU(new File(local.toUri())))); synchronized (attempts) { attempts.remove(key); } @@ -710,13 +707,10 @@ public void run() { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); LocalResourceRequest req = assoc.getResource().getRequest(); - dispatcher.getEventHandler().handle( - new ContainerResourceFailedEvent( - assoc.getContext().getContainerId(), - req, e.getCause())); - publicRsrc.localizationCompleted(req, false); - List reqs; + publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e + .getCause())); synchronized (attempts) { + List reqs; reqs = attempts.get(req); if (null == reqs) { LOG.error("Missing pending list for " + req); @@ -724,13 +718,6 @@ public void run() { } attempts.remove(req); } - // let the other containers know about the localization failure - for (LocalizerResourceRequestEvent reqEvent : reqs) { - dispatcher.getEventHandler().handle( - new ContainerResourceFailedEvent( - reqEvent.getContext().getContainerId(), - reqEvent.getResource().getRequest(), e.getCause())); - } } catch (CancellationException e) { // ignore; shutting down } @@ -810,13 +797,14 @@ private LocalResource findNextResource() { return null; } - // TODO this sucks. Fix it later - @SuppressWarnings("unchecked") // dispatcher not typed LocalizerHeartbeatResponse update( List remoteResourceStatuses) { LocalizerHeartbeatResponse response = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); + String user = context.getUser(); + ApplicationId applicationId = + context.getContainerId().getApplicationAttemptId().getApplicationId(); // The localizer has just spawned. Start giving it resources for // remote-fetching. if (remoteResourceStatuses.isEmpty()) { @@ -847,6 +835,11 @@ LocalizerHeartbeatResponse update( } ArrayList rsrcs = new ArrayList(); + /* + * TODO : It doesn't support multiple downloads per ContainerLocalizer + * at the same time. We need to think whether we should support this. + */ + for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); LocalResourceRequest req = null; @@ -865,11 +858,10 @@ LocalizerHeartbeatResponse update( case FETCH_SUCCESS: // notify resource try { - assoc.getResource().handle( - new ResourceLocalizedEvent(req, - ConverterUtils.getPathFromYarnURL(stat.getLocalPath()), - stat.getLocalSize())); - localizationCompleted(stat); + getLocalResourcesTracker(req.getVisibility(), user, applicationId) + .handle( + new ResourceLocalizedEvent(req, ConverterUtils + .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); } catch (URISyntaxException e) { } if (pending.isEmpty()) { // TODO: Synchronization @@ -899,19 +891,16 @@ LocalizerHeartbeatResponse update( LOG.info("DEBUG: FAILED " + req, stat.getException()); assoc.getResource().unlock(); response.setLocalizerAction(LocalizerAction.DIE); - localizationCompleted(stat); - // TODO: Why is this event going directly to the container. Why not - // the resource itself? What happens to the resource? Is it removed? - dispatcher.getEventHandler().handle( - new ContainerResourceFailedEvent(context.getContainerId(), - req, stat.getException())); + getLocalResourcesTracker(req.getVisibility(), user, applicationId) + .handle( + new ResourceFailedLocalizationEvent(req, stat.getException())); break; default: LOG.info("Unknown status: " + stat.getStatus()); response.setLocalizerAction(LocalizerAction.DIE); - dispatcher.getEventHandler().handle( - new ContainerResourceFailedEvent(context.getContainerId(), - req, stat.getException())); + getLocalResourcesTracker(req.getVisibility(), user, applicationId) + .handle( + new ResourceFailedLocalizationEvent(req, stat.getException())); break; } } @@ -919,27 +908,6 @@ LocalizerHeartbeatResponse update( return response; } - private void localizationCompleted(LocalResourceStatus stat) { - try { - LocalResource rsrc = stat.getResource(); - LocalResourceRequest key = new LocalResourceRequest(rsrc); - String user = context.getUser(); - ApplicationId appId = - context.getContainerId().getApplicationAttemptId() - .getApplicationId(); - LocalResourceVisibility vis = rsrc.getVisibility(); - LocalResourcesTracker tracker = - getLocalResourcesTracker(vis, user, appId); - if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) { - tracker.localizationCompleted(key, true); - } else { - tracker.localizationCompleted(key, false); - } - } catch (URISyntaxException e) { - LOG.error("Invalid resource URL specified", e); - } - } - private Path getPathForLocalization(LocalResource rsrc) throws IOException, URISyntaxException { String user = context.getUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java index 751f60e0af..75c8ad7663 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java @@ -20,5 +20,6 @@ enum ResourceState { INIT, DOWNLOADING, - LOCALIZED + LOCALIZED, + FAILED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java index d68a1b6d39..e657c0acf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java @@ -29,5 +29,7 @@ public enum ResourceEventType { /** See {@link ResourceLocalizedEvent} */ LOCALIZED, /** See {@link ResourceReleaseEvent} */ - RELEASE + RELEASE, + /** See {@link ResourceFailedLocalizationEvent} */ + LOCALIZATION_FAILED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java new file mode 100644 index 0000000000..79b28bac90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +/** + * This event is sent by the localizer in case resource localization fails for + * the requested resource. + */ +public class ResourceFailedLocalizationEvent extends ResourceEvent { + + private Throwable cause; + + public ResourceFailedLocalizationEvent(LocalResourceRequest rsrc, + Throwable cause) { + super(rsrc, ResourceEventType.LOCALIZATION_FAILED); + this.cause = cause; + } + + public Throwable getCause() { + return cause; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index a8bbdb0352..b2caba02e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import static org.mockito.Mockito.any; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -41,11 +42,15 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; @@ -224,6 +229,142 @@ public void testConsistency() { } } + @Test(timeout = 1000) + @SuppressWarnings("unchecked") + public void testLocalResourceCache() { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); + + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + + // Registering event handlers. + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + LocalResourcesTracker tracker = + new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf); + + LocalResourceRequest lr = + createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC); + + // Creating 2 containers for same application which will be requesting + // same local resource. + // Container 1 requesting local resource. + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + ResourceEvent reqEvent1 = + new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc1); + + // No resource request is initially present in local cache + Assert.assertEquals(0, localrsrc.size()); + + // Container-1 requesting local resource. + tracker.handle(reqEvent1); + + // New localized Resource should have been added to local resource map + // and the requesting container will be added to its waiting queue. + Assert.assertEquals(1, localrsrc.size()); + Assert.assertTrue(localrsrc.containsKey(lr)); + Assert.assertEquals(1, localrsrc.get(lr).getRefCount()); + Assert.assertTrue(localrsrc.get(lr).ref.contains(cId1)); + Assert.assertEquals(ResourceState.DOWNLOADING, localrsrc.get(lr) + .getState()); + + // Container 2 requesting the resource + ContainerId cId2 = BuilderUtils.newContainerId(1, 1, 1, 2); + LocalizerContext lc2 = new LocalizerContext(user, cId2, null); + ResourceEvent reqEvent2 = + new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2); + tracker.handle(reqEvent2); + + // Container 2 should have been added to the waiting queue of the local + // resource + Assert.assertEquals(2, localrsrc.get(lr).getRefCount()); + Assert.assertTrue(localrsrc.get(lr).ref.contains(cId2)); + + // Failing resource localization + ResourceEvent resourceFailedEvent = + new ResourceFailedLocalizationEvent(lr, new Exception("test")); + + // Backing up the resource to track its state change as it will be + // removed after the failed event. + LocalizedResource localizedResource = localrsrc.get(lr); + + tracker.handle(resourceFailedEvent); + + // After receiving failed resource event; all waiting containers will be + // notified with Container Resource Failed Event. + Assert.assertEquals(0, localrsrc.size()); + verify(containerEventHandler, times(2)).handle( + isA(ContainerResourceFailedEvent.class)); + Assert.assertEquals(ResourceState.FAILED, localizedResource.getState()); + + // Container 1 trying to release the resource (This resource is already + // deleted from the cache. This call should return silently without + // exception. + ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1); + tracker.handle(relEvent1); + + // Container-3 now requests for the same resource. This request call + // is coming prior to Container-2's release call. + ContainerId cId3 = BuilderUtils.newContainerId(1, 1, 1, 3); + LocalizerContext lc3 = new LocalizerContext(user, cId3, null); + ResourceEvent reqEvent3 = + new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3); + tracker.handle(reqEvent3); + + // Local resource cache now should have the requested resource and the + // number of waiting containers should be 1. + Assert.assertEquals(1, localrsrc.size()); + Assert.assertTrue(localrsrc.containsKey(lr)); + Assert.assertEquals(1, localrsrc.get(lr).getRefCount()); + Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3)); + + // Container-2 Releases the resource + ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2); + tracker.handle(relEvent2); + + // Making sure that there is no change in the cache after the release. + Assert.assertEquals(1, localrsrc.size()); + Assert.assertTrue(localrsrc.containsKey(lr)); + Assert.assertEquals(1, localrsrc.get(lr).getRefCount()); + Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3)); + + // Sending ResourceLocalizedEvent to tracker. In turn resource should + // send Container Resource Localized Event to waiting containers. + Path localizedPath = new Path("/tmp/file1"); + ResourceLocalizedEvent localizedEvent = + new ResourceLocalizedEvent(lr, localizedPath, 123L); + tracker.handle(localizedEvent); + + // Verifying ContainerResourceLocalizedEvent . + verify(containerEventHandler, times(1)).handle( + isA(ContainerResourceLocalizedEvent.class)); + Assert.assertEquals(ResourceState.LOCALIZED, localrsrc.get(lr) + .getState()); + Assert.assertEquals(1, localrsrc.get(lr).getRefCount()); + + // Container-3 releasing the resource. + ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3); + tracker.handle(relEvent3); + + Assert.assertEquals(0, localrsrc.get(lr).getRefCount()); + + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + @Test(timeout = 100000) @SuppressWarnings("unchecked") public void testHierarchicalLocalCacheDirectories() { @@ -266,19 +407,25 @@ public void testHierarchicalLocalCacheDirectories() { // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); // Simulate lr1 getting localized - ResourceLocalizedEvent rle = + ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(lr1, new Path(hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "file1"), 120); - tracker.handle(rle); + tracker.handle(rle1); // Localization successful. - tracker.localizationCompleted(lr1, true); LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3, LocalResourceVisibility.PUBLIC); + // Container 1 requests lr2 to be localized. + ResourceEvent reqEvent2 = + new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent2); + Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); // localization failed. - tracker.localizationCompleted(lr2, false); + ResourceFailedLocalizationEvent rfe2 = + new ResourceFailedLocalizationEvent(lr2, new Exception("Test")); + tracker.handle(rfe2); /* * The path returned for two localization should be different because we @@ -292,7 +439,11 @@ public void testHierarchicalLocalCacheDirectories() { LocalResourceVisibility.PUBLIC, lc1); tracker.handle(reqEvent3); Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir); - tracker.localizationCompleted(lr3, true); + // localization successful + ResourceLocalizedEvent rle3 = + new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri() + .toString() + Path.SEPARATOR + "file3"), 120); + tracker.handle(rle3); // Verifying that path created is inside the subdirectory Assert.assertEquals(hierarchicalPath3.toUri().toString(),