YARN-4096. App local logs are leaked if log aggregation fails to initialize for the app. Contributed by Jason Lowe.

This commit is contained in:
Zhihai Xu 2015-09-08 12:29:54 -07:00
parent 970daaa5e4
commit 16b9037dc1
5 changed files with 31 additions and 10 deletions

View File

@ -898,6 +898,9 @@ Release 2.7.2 - UNRELEASED
YARN-4087. Followup fixes after YARN-2019 regarding RM behavior when YARN-4087. Followup fixes after YARN-2019 regarding RM behavior when
state-store error occurs. (Jian He via xgong) state-store error occurs. (Jian He via xgong)
YARN-4096. App local logs are leaked if log aggregation fails to initialize
for the app. (Jason Lowe via zxu)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -27,4 +27,6 @@ public interface AppLogAggregator extends Runnable {
void abortLogAggregation(); void abortLogAggregation();
void finishLogAggregation(); void finishLogAggregation();
void disableLogAggregation();
} }

View File

@ -596,6 +596,11 @@ public synchronized void abortLogAggregation() {
this.notifyAll(); this.notifyAll();
} }
@Override
public void disableLogAggregation() {
this.logAggregationDisabled = true;
}
@Private @Private
@VisibleForTesting @VisibleForTesting
// This is only used for testing. // This is only used for testing.

View File

@ -363,19 +363,19 @@ protected void initAppAggregator(final ApplicationId appId, String user,
throw new YarnRuntimeException("Duplicate initApp for " + appId); throw new YarnRuntimeException("Duplicate initApp for " + appId);
} }
// wait until check for existing aggregator to create dirs // wait until check for existing aggregator to create dirs
YarnRuntimeException appDirException = null;
try { try {
// Create the app dir // Create the app dir
createAppDir(user, appId, userUgi); createAppDir(user, appId, userUgi);
} catch (Exception e) { } catch (Exception e) {
appLogAggregators.remove(appId); appLogAggregator.disableLogAggregation();
closeFileSystems(userUgi);
if (!(e instanceof YarnRuntimeException)) { if (!(e instanceof YarnRuntimeException)) {
e = new YarnRuntimeException(e); appDirException = new YarnRuntimeException(e);
} else {
appDirException = (YarnRuntimeException)e;
} }
throw (YarnRuntimeException)e;
} }
// TODO Get the user configuration for the list of containers that need log // TODO Get the user configuration for the list of containers that need log
// aggregation. // aggregation.
@ -391,6 +391,10 @@ public void run() {
} }
}; };
this.threadPool.execute(aggregatorWrapper); this.threadPool.execute(aggregatorWrapper);
if (appDirException != null) {
throw appDirException;
}
} }
protected void closeFileSystems(final UserGroupInformation userUgi) { protected void closeFileSystems(final UserGroupInformation userUgi) {

View File

@ -731,9 +731,10 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
DeletionService spyDelSrvc = spy(this.delSrvc);
LogAggregationService logAggregationService = spy( LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc, new LogAggregationService(dispatcher, this.context, spyDelSrvc,
super.dirsHandler)); super.dirsHandler));
logAggregationService.init(this.conf); logAggregationService.init(this.conf);
logAggregationService.start(); logAggregationService.start();
@ -741,6 +742,11 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
ApplicationId appId = ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(), BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000)); (int) (Math.random() * 1000));
File appLogDir =
new File(localLogDir, ConverterUtils.toString(appId));
appLogDir.mkdir();
Exception e = new RuntimeException("KABOOM!"); Exception e = new RuntimeException("KABOOM!");
doThrow(e) doThrow(e)
.when(logAggregationService).createAppDir(any(String.class), .when(logAggregationService).createAppDir(any(String.class),
@ -759,9 +765,6 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
}; };
checkEvents(appEventHandler, expectedEvents, false, checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic"); "getType", "getApplicationID", "getDiagnostic");
// filesystems may have been instantiated
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
// verify trying to collect logs for containers/apps we don't know about // verify trying to collect logs for containers/apps we don't know about
// doesn't blow up and tear down the NM // doesn't blow up and tear down the NM
@ -774,6 +777,10 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
logAggregationService.stop(); logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators()); assertEquals(0, logAggregationService.getNumAggregators());
verify(spyDelSrvc).delete(eq(user), any(Path.class),
Mockito.<Path>anyVararg());
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
} }
private void writeContainerLogs(File appLogDir, ContainerId containerId, private void writeContainerLogs(File appLogDir, ContainerId containerId,