YARN-4773. Log aggregation performs extraneous filesystem operations when rolling log aggregation is disabled. Contributed by Jun Gong
This commit is contained in:
parent
8bfaa80037
commit
948b758070
@ -124,11 +124,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||||||
private final long rollingMonitorInterval;
|
private final long rollingMonitorInterval;
|
||||||
private final boolean logAggregationInRolling;
|
private final boolean logAggregationInRolling;
|
||||||
private final NodeId nodeId;
|
private final NodeId nodeId;
|
||||||
// This variable is only for testing
|
|
||||||
private final AtomicBoolean waiting = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
// This variable is only for testing
|
// These variables are only for testing
|
||||||
|
private final AtomicBoolean waiting = new AtomicBoolean(false);
|
||||||
private int logAggregationTimes = 0;
|
private int logAggregationTimes = 0;
|
||||||
|
private int cleanupOldLogTimes = 0;
|
||||||
|
|
||||||
private boolean renameTemporaryLogFileFailed = false;
|
private boolean renameTemporaryLogFileFailed = false;
|
||||||
|
|
||||||
@ -365,8 +365,9 @@ private void uploadLogsForContainers(boolean appFinished) {
|
|||||||
|
|
||||||
// Before upload logs, make sure the number of existing logs
|
// Before upload logs, make sure the number of existing logs
|
||||||
// is smaller than the configured NM log aggregation retention size.
|
// is smaller than the configured NM log aggregation retention size.
|
||||||
if (uploadedLogsInThisCycle) {
|
if (uploadedLogsInThisCycle && logAggregationInRolling) {
|
||||||
cleanOldLogs();
|
cleanOldLogs();
|
||||||
|
cleanupOldLogTimes++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
@ -689,4 +690,9 @@ public UserGroupInformation getUgi() {
|
|||||||
public int getLogAggregationTimes() {
|
public int getLogAggregationTimes() {
|
||||||
return this.logAggregationTimes;
|
return this.logAggregationTimes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getCleanupOldLogTimes() {
|
||||||
|
return this.cleanupOldLogTimes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2276,7 +2276,7 @@ public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception {
|
|||||||
Records.newRecord(LogAggregationContext.class);
|
Records.newRecord(LogAggregationContext.class);
|
||||||
logAggregationContext.setLogAggregationPolicyClassName(
|
logAggregationContext.setLogAggregationPolicyClassName(
|
||||||
FailedOrKilledContainerLogAggregationPolicy.class.getName());
|
FailedOrKilledContainerLogAggregationPolicy.class.getName());
|
||||||
verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2);
|
verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
@ -2290,13 +2290,13 @@ public void testSkipUnnecessaryNNOperationsForService() throws Exception {
|
|||||||
AMOnlyLogAggregationPolicy.class.getName());
|
AMOnlyLogAggregationPolicy.class.getName());
|
||||||
contextWithAMOnly.setRolledLogsIncludePattern("sys*");
|
contextWithAMOnly.setRolledLogsIncludePattern("sys*");
|
||||||
contextWithAMOnly.setRolledLogsExcludePattern("std_final");
|
contextWithAMOnly.setRolledLogsExcludePattern("std_final");
|
||||||
verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4);
|
verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifySkipUnnecessaryNNOperations(
|
private void verifySkipUnnecessaryNNOperations(
|
||||||
LogAggregationContext logAggregationContext,
|
LogAggregationContext logAggregationContext,
|
||||||
int expectedLogAggregationTimes, int expectedAggregationReportNum)
|
int expectedLogAggregationTimes, int expectedAggregationReportNum,
|
||||||
throws Exception {
|
int expectedCleanupOldLogsTimes) throws Exception {
|
||||||
LogAggregationService logAggregationService = new LogAggregationService(
|
LogAggregationService logAggregationService = new LogAggregationService(
|
||||||
dispatcher, this.context, this.delSrvc, super.dirsHandler);
|
dispatcher, this.context, this.delSrvc, super.dirsHandler);
|
||||||
logAggregationService.init(this.conf);
|
logAggregationService.init(this.conf);
|
||||||
@ -2307,7 +2307,7 @@ private void verifySkipUnnecessaryNNOperations(
|
|||||||
null, this.acls, logAggregationContext));
|
null, this.acls, logAggregationContext));
|
||||||
|
|
||||||
// Container finishes
|
// Container finishes
|
||||||
String[] logFiles = new String[] { "stdout" };
|
String[] logFiles = new String[] { "sysout" };
|
||||||
finishContainer(appId, logAggregationService,
|
finishContainer(appId, logAggregationService,
|
||||||
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
|
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
|
||||||
AppLogAggregatorImpl aggregator =
|
AppLogAggregatorImpl aggregator =
|
||||||
@ -2327,6 +2327,8 @@ private void verifySkipUnnecessaryNNOperations(
|
|||||||
aggregator.getLogAggregationTimes());
|
aggregator.getLogAggregationTimes());
|
||||||
assertEquals(expectedAggregationReportNum,
|
assertEquals(expectedAggregationReportNum,
|
||||||
this.context.getLogAggregationStatusForApps().size());
|
this.context.getLogAggregationStatusForApps().size());
|
||||||
|
assertEquals(expectedCleanupOldLogsTimes,
|
||||||
|
aggregator.getCleanupOldLogTimes());
|
||||||
}
|
}
|
||||||
|
|
||||||
private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
||||||
|
Loading…
Reference in New Issue
Block a user