diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c415c3fcf7..e8c46a72b0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -290,6 +290,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth via bobby) + MAPREDUCE-3977. LogAggregationService leaks log aggregator objects + (Jason Lowe via bobby) + Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 173bc95943..f7cf6c53b9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -139,12 +140,6 @@ public synchronized void stop() { super.stop(); } - - - - - - private void verifyAndCreateRemoteLogDir(Configuration conf) { // Checking the existance of the TLD FileSystem remoteFS = null; @@ -289,7 +284,7 @@ private void initApp(final ApplicationId appId, String user, createAppDir(user, appId, userUgi); // New application - AppLogAggregator appLogAggregator = + final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, @@ -303,7 +298,22 @@ private void initApp(final ApplicationId appId, String user, // aggregation. // Schedule the aggregator. - this.threadPool.execute(appLogAggregator); + Runnable aggregatorWrapper = new Runnable() { + public void run() { + try { + appLogAggregator.run(); + } finally { + appLogAggregators.remove(appId); + } + } + }; + this.threadPool.execute(aggregatorWrapper); + } + + // for testing only + @Private + int getNumAggregators() { + return this.appLogAggregators.size(); } private void stopContainer(ContainerId containerId, int exitCode) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 18b8d9b913..ab40335703 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -565,4 +565,38 @@ public void testStopAfterError() throws Exception { logAggregationService.stop(); } + + @Test + @SuppressWarnings("unchecked") + public void testLogAggregatorCleanup() throws Exception { + DeletionService delSrvc = mock(DeletionService.class); + + // get the AppLogAggregationImpl thread to crash + LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, delSrvc, + mockedDirSvc); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + logAggregationService.handle(new LogHandlerAppStartedEvent( + application1, this.user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); + dispatcher.await(); + int timeToWait = 20 * 1000; + while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) { + Thread.sleep(100); + timeToWait -= 100; + } + Assert.assertEquals("Log aggregator failed to cleanup!", 0, + logAggregationService.getNumAggregators()); + } }