From 16b9037dc1300b8bdbe54ba7cd47c53fe16e93d8 Mon Sep 17 00:00:00 2001 From: Zhihai Xu Date: Tue, 8 Sep 2015 12:29:54 -0700 Subject: [PATCH] YARN-4096. App local logs are leaked if log aggregation fails to initialize for the app. Contributed by Jason Lowe. --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../logaggregation/AppLogAggregator.java | 2 ++ .../logaggregation/AppLogAggregatorImpl.java | 5 +++++ .../logaggregation/LogAggregationService.java | 14 +++++++++----- .../TestLogAggregationService.java | 17 ++++++++++++----- 5 files changed, 31 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6d3796af33..73080757b1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -898,6 +898,9 @@ Release 2.7.2 - UNRELEASED YARN-4087. Followup fixes after YARN-2019 regarding RM behavior when state-store error occurs. (Jian He via xgong) + YARN-4096. App local logs are leaked if log aggregation fails to initialize + for the app. (Jason Lowe via zxu) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 83c5d5a624..0178699573 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -27,4 +27,6 @@ public interface AppLogAggregator extends Runnable { void abortLogAggregation(); void finishLogAggregation(); + + void disableLogAggregation(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 742b8a98cd..b2342c7c57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -596,6 +596,11 @@ public synchronized void abortLogAggregation() { this.notifyAll(); } + @Override + public void disableLogAggregation() { + this.logAggregationDisabled = true; + } + @Private @VisibleForTesting // This is only used for testing. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 259e9ae960..6a6f101a88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -363,19 +363,19 @@ protected void initAppAggregator(final ApplicationId appId, String user, throw new YarnRuntimeException("Duplicate initApp for " + appId); } // wait until check for existing aggregator to create dirs + YarnRuntimeException appDirException = null; try { // Create the app dir createAppDir(user, appId, userUgi); } catch (Exception e) { - appLogAggregators.remove(appId); - closeFileSystems(userUgi); + appLogAggregator.disableLogAggregation(); if (!(e instanceof YarnRuntimeException)) { - e = new YarnRuntimeException(e); + appDirException = new YarnRuntimeException(e); + } else { + appDirException = (YarnRuntimeException)e; } - throw (YarnRuntimeException)e; } - // TODO Get the user configuration for the list of containers that need log // aggregation. @@ -391,6 +391,10 @@ public void run() { } }; this.threadPool.execute(aggregatorWrapper); + + if (appDirException != null) { + throw appDirException; + } } protected void closeFileSystems(final UserGroupInformation userUgi) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 77d75cae7b..77c6e3c42a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -731,9 +731,10 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - + + DeletionService spyDelSrvc = spy(this.delSrvc); LogAggregationService logAggregationService = spy( - new LogAggregationService(dispatcher, this.context, this.delSrvc, + new LogAggregationService(dispatcher, this.context, spyDelSrvc, super.dirsHandler)); logAggregationService.init(this.conf); logAggregationService.start(); @@ -741,6 +742,11 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() ApplicationId appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000)); + + File appLogDir = + new File(localLogDir, ConverterUtils.toString(appId)); + appLogDir.mkdir(); + Exception e = new RuntimeException("KABOOM!"); doThrow(e) .when(logAggregationService).createAppDir(any(String.class), @@ -759,9 +765,6 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - // filesystems may have been instantiated - verify(logAggregationService).closeFileSystems( - any(UserGroupInformation.class)); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM @@ -774,6 +777,10 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); + verify(spyDelSrvc).delete(eq(user), any(Path.class), + Mockito.anyVararg()); + verify(logAggregationService).closeFileSystems( + any(UserGroupInformation.class)); } private void writeContainerLogs(File appLogDir, ContainerId containerId,