From 8aab8533a1b07c632a1b4da596dfa65302afcb18 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 11 Mar 2014 23:33:56 +0000 Subject: [PATCH] YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException in the public-localizer thread-pool. Contributed by Varun Vasudev. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576545 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../ResourceLocalizationService.java | 10 +- .../TestResourceLocalizationService.java | 120 ++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a222fed796..6b657abef9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -446,6 +446,9 @@ Release 2.4.0 - UNRELEASED YARN-1821. NPE on registerNodeManager if the request has containers for UnmanagedAMs. (kasha) + YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException + in the public-localizer thread-pool. (Varun Vasudev via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 97c68aae9e..5e20b539aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -683,9 +684,16 @@ public void addResource(LocalizerResourceRequestEvent request) { } } catch (IOException e) { rsrc.unlock(); - // TODO Need to Fix IO Exceptions - Notifying resource + publicRsrc.handle(new ResourceFailedLocalizationEvent(request + .getResource().getRequest(), e.getMessage())); LOG.error("Local path for public localization is not found. " + " May be disks failed.", e); + } catch (RejectedExecutionException re) { + rsrc.unlock(); + publicRsrc.handle(new ResourceFailedLocalizationEvent(request + .getResource().getRequest(), re.getMessage())); + LOG.error("Failed to submit rsrc " + rsrc + " for download." + + " Either queue is full or threadpool is shutdown.", re); } } else { rsrc.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 8b7c91aa82..a28785def4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; @@ -126,6 +128,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -681,6 +684,121 @@ public Void answer(InvocationOnMock invocation) throws IOException { dispatcher.stop(); } } + + /* + * Test case for handling RejectedExecutionException and IOException which can + * be thrown when adding public resources to the pending queue. + * RejectedExecutionException can be thrown either due to the incoming queue + * being full or if the ExecutorCompletionService threadpool is shutdown. + * Since it's hard to simulate the queue being full, this test just shuts down + * the threadpool and makes sure the exception is handled. If anything is + * messed up the async dispatcher thread will cause a system exit causing the + * test to fail. + */ + @Test + @SuppressWarnings("unchecked") + public void testPublicResourceAddResourceExceptions() throws Exception { + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + + DrainDispatcher dispatcher = new DrainDispatcher(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler); + dirsHandlerSpy.init(conf); + + dispatcher.init(conf); + dispatcher.start(); + + try { + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandlerSpy); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext( + isA(Configuration.class)); + + spyService.init(conf); + spyService.start(); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // init resources + Random r = new Random(); + r.setSeed(r.nextLong()); + + // Queue localization request for the public resource + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + Map> req = + new HashMap>(); + req + .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq)); + + // init container. + final Container c = getMockContainer(appId, 42); + + // first test ioexception + Mockito + .doThrow(new IOException()) + .when(dirsHandlerSpy) + .getLocalPathForWrite(isA(String.class), Mockito.anyLong(), + Mockito.anyBoolean()); + // send request + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + LocalResourcesTracker tracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + user, appId); + Assert.assertNull(tracker.getLocalizedResource(pubReq)); + + // test RejectedExecutionException + Mockito + .doCallRealMethod() + .when(dirsHandlerSpy) + .getLocalPathForWrite(isA(String.class), Mockito.anyLong(), + Mockito.anyBoolean()); + + // shutdown the thread pool + PublicLocalizer publicLocalizer = spyService.getPublicLocalizer(); + publicLocalizer.threadPool.shutdown(); + + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + tracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + user, appId); + Assert.assertNull(tracker.getLocalizedResource(pubReq)); + + } finally { + // if we call stop with events in the queue, an InterruptedException gets + // thrown resulting in the dispatcher thread causing a system exit + dispatcher.await(); + dispatcher.stop(); + } + } @Test(timeout = 100000) @SuppressWarnings("unchecked") @@ -829,6 +947,8 @@ rls.new LocalizerRunner(new LocalizerContext(user, container2 } } } + + @Test(timeout = 10000) @SuppressWarnings("unchecked")