YARN-11277. Trigger log-dir deletion by size for NonAggregatingLogHandler. (#4797)
Reviewed-by: Akira Ajisaka <aajisaka@apache.org> Reviewed-by: Ashutosh Gupta <ashugpt@amazon.com> Reviewed-by: Shilun Fan <slfan1989@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
e0a339223a
commit
ee94f6cdcb
@ -4960,6 +4960,17 @@ public static boolean areNodeLabelsEnabled(
|
||||
public static final String APPS_CACHE_EXPIRE = YARN_PREFIX + "apps.cache.expire";
|
||||
public static final String DEFAULT_APPS_CACHE_EXPIRE = "30s";
|
||||
|
||||
/** Enabled trigger log-dir deletion by size for NonAggregatingLogHandler. */
|
||||
public static final String NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = NM_PREFIX +
|
||||
"log.trigger.delete.by-size.enabled";
|
||||
public static final boolean DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = false;
|
||||
|
||||
/** Trigger log-dir deletion when the total log size of an app is greater than
|
||||
* yarn.nodemanager.log.delete.threshold.
|
||||
* Depends on yarn.nodemanager.log.trigger.delete.by-size.enabled = true. */
|
||||
public static final String NM_LOG_DELETE_THRESHOLD = NM_PREFIX + "log.delete.threshold";
|
||||
public static final long DEFAULT_NM_LOG_DELETE_THRESHOLD = 100L * 1024 * 1024 * 1024;
|
||||
|
||||
public YarnConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
@ -5293,4 +5293,27 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.log.trigger.delete.by-size.enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Optional.
|
||||
Enabled trigger log-dir deletion by size for NonAggregatingLogHandler
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.nodemanager.log.delete.threshold</name>
|
||||
<value>100g</value>
|
||||
<description>
|
||||
Optional.
|
||||
Trigger log-dir deletion when the total log size of an app is greater than
|
||||
yarn.nodemanager.log.delete.threshold and
|
||||
yarn.nodemanager.log.trigger.delete.by-size.enabled = true.
|
||||
You can use the following suffix (case insensitive): k(kilo), m(mega), g(giga), t(tera), p(peta),
|
||||
e(exa) to specify the size (such as 128k, 512m, 1g, etc.),
|
||||
Or provide complete size in bytes (such as 134217728 for 128 MB).
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -71,6 +71,8 @@ public class NonAggregatingLogHandler extends AbstractService implements
|
||||
private final LocalDirsHandlerService dirsHandler;
|
||||
private final NMStateStoreService stateStore;
|
||||
private long deleteDelaySeconds;
|
||||
private boolean enableTriggerDeleteBySize;
|
||||
private long deleteThreshold;
|
||||
private ScheduledThreadPoolExecutor sched;
|
||||
|
||||
public NonAggregatingLogHandler(Dispatcher dispatcher,
|
||||
@ -90,6 +92,12 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.deleteDelaySeconds =
|
||||
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
|
||||
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
|
||||
this.enableTriggerDeleteBySize =
|
||||
conf.getBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED,
|
||||
YarnConfiguration.DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED);
|
||||
this.deleteThreshold =
|
||||
conf.getLongBytes(YarnConfiguration.NM_LOG_DELETE_THRESHOLD,
|
||||
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THRESHOLD);
|
||||
sched = createScheduledThreadPoolExecutor(conf);
|
||||
super.serviceInit(conf);
|
||||
recover();
|
||||
@ -165,13 +173,9 @@ public void handle(LogHandlerEvent event) {
|
||||
LogHandlerAppFinishedEvent appFinishedEvent =
|
||||
(LogHandlerAppFinishedEvent) event;
|
||||
ApplicationId appId = appFinishedEvent.getApplicationId();
|
||||
// Schedule - so that logs are available on the UI till they're deleted.
|
||||
LOG.info("Scheduling Log Deletion for application: "
|
||||
+ appId + ", with delay of "
|
||||
+ this.deleteDelaySeconds + " seconds");
|
||||
String user = appOwners.remove(appId);
|
||||
if (user == null) {
|
||||
LOG.error("Unable to locate user for " + appId);
|
||||
LOG.error("Unable to locate user for {}", appId);
|
||||
// send LOG_HANDLING_FAILED out
|
||||
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
|
||||
new ApplicationEvent(appId,
|
||||
@ -191,8 +195,20 @@ public void handle(LogHandlerEvent event) {
|
||||
LOG.error("Unable to record log deleter state", e);
|
||||
}
|
||||
try {
|
||||
sched.schedule(logDeleter, this.deleteDelaySeconds,
|
||||
TimeUnit.SECONDS);
|
||||
boolean logDeleterStarted = false;
|
||||
if (enableTriggerDeleteBySize) {
|
||||
final long appLogSize = calculateSizeOfAppLogs(user, appId);
|
||||
if (appLogSize >= deleteThreshold) {
|
||||
LOG.info("Log Deletion for application: {}, with no delay, size={}", appId, appLogSize);
|
||||
sched.schedule(logDeleter, 0, TimeUnit.SECONDS);
|
||||
logDeleterStarted = true;
|
||||
}
|
||||
}
|
||||
if (!logDeleterStarted) {
|
||||
LOG.info("Scheduling Log Deletion for application: {}, with delay of {} seconds",
|
||||
appId, this.deleteDelaySeconds);
|
||||
sched.schedule(logDeleter, this.deleteDelaySeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
// Handling this event in local thread before starting threads
|
||||
// or after calling sched.shutdownNow().
|
||||
@ -200,7 +216,6 @@ public void handle(LogHandlerEvent event) {
|
||||
}
|
||||
break;
|
||||
default:
|
||||
; // Ignore
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,6 +235,24 @@ ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
|
||||
return sched;
|
||||
}
|
||||
|
||||
private long calculateSizeOfAppLogs(String user, ApplicationId applicationId) {
|
||||
FileContext lfs = getLocalFileContext(getConfig());
|
||||
long appLogsSize = 0L;
|
||||
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
|
||||
Path logDir = new Path(rootLogDir, applicationId.toString());
|
||||
try {
|
||||
appLogsSize += lfs.getFileStatus(logDir).getLen();
|
||||
} catch (UnsupportedFileSystemException ue) {
|
||||
LOG.warn("Unsupported file system used for log dir {}", logDir, ue);
|
||||
continue;
|
||||
} catch (IOException ie) {
|
||||
LOG.error("Unable to getFileStatus for {}", logDir, ie);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return appLogsSize;
|
||||
}
|
||||
|
||||
class LogDeleterRunnable implements Runnable {
|
||||
private String user;
|
||||
private ApplicationId applicationId;
|
||||
|
@ -596,4 +596,103 @@ public void resetLogHandlingEvent() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogSizeThresholdDeletion() throws IOException {
|
||||
ApplicationId anotherAppId = BuilderUtils.newApplicationId(4567, 1);
|
||||
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId, 2);
|
||||
String user2 = "test_user2";
|
||||
File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
|
||||
String localLogDirsString = localLogDirs[0].getAbsolutePath() + ","
|
||||
+ localLogDirs[1].getAbsolutePath();
|
||||
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
|
||||
conf.setBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
|
||||
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 60 * 1000);
|
||||
conf.set(YarnConfiguration.NM_LOG_DELETE_THRESHOLD, "15g");
|
||||
|
||||
dirsHandler.init(conf);
|
||||
|
||||
NonAggregatingLogHandler rawLogHandler =
|
||||
new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler,
|
||||
new NMNullStateStoreService());
|
||||
NonAggregatingLogHandler logHandler = spy(rawLogHandler);
|
||||
AbstractFileSystem spylfs =
|
||||
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
||||
FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
||||
doReturn(lfs).when(logHandler)
|
||||
.getLocalFileContext(isA(Configuration.class));
|
||||
FsPermission defaultPermission =
|
||||
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
|
||||
FileStatus fs1 =
|
||||
new FileStatus(10 * 1024 * 1024 * 1024L, true, 1, 0,
|
||||
System.currentTimeMillis(), 0, defaultPermission, "", "",
|
||||
new Path(localLogDirs[0].getAbsolutePath()));
|
||||
FileStatus fs2 =
|
||||
new FileStatus(5 * 1024 * 1024 * 1024L, true, 1, 0,
|
||||
System.currentTimeMillis(), 0, defaultPermission, "", "",
|
||||
new Path(localLogDirs[0].getAbsolutePath()));
|
||||
Path path1 = new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
|
||||
Path path2 = new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
|
||||
Path path3 = new Path(localLogDirs[0].getAbsolutePath(), anotherAppId.toString());
|
||||
Path path4 = new Path(localLogDirs[1].getAbsolutePath(), anotherAppId.toString());
|
||||
|
||||
doReturn(fs1).when(spylfs).getFileStatus(eq(path1));
|
||||
doReturn(fs1).when(spylfs).getFileStatus(eq(path2));
|
||||
doReturn(fs2).when(spylfs).getFileStatus(eq(path3));
|
||||
doReturn(fs2).when(spylfs).getFileStatus(eq(path4));
|
||||
|
||||
logHandler.init(conf);
|
||||
logHandler.start();
|
||||
|
||||
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
|
||||
|
||||
logHandler.handle(new LogHandlerContainerFinishedEvent(container11,
|
||||
ContainerType.APPLICATION_MASTER, 0));
|
||||
|
||||
logHandler.handle(new LogHandlerAppFinishedEvent(appId));
|
||||
|
||||
logHandler.handle(new LogHandlerAppStartedEvent(anotherAppId, user2,
|
||||
null, null));
|
||||
|
||||
logHandler.handle(new LogHandlerContainerFinishedEvent(container22,
|
||||
ContainerType.APPLICATION_MASTER, 0));
|
||||
|
||||
logHandler.handle(new LogHandlerAppFinishedEvent(anotherAppId));
|
||||
|
||||
Path[] localAppLogDirs = new Path[]{path1, path2};
|
||||
Path[] anotherLocalAppLogDirs = new Path[]{path3, path4};
|
||||
|
||||
testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
|
||||
testDeletionServiceNeverCall(mockDelService, user2, 5000, anotherLocalAppLogDirs);
|
||||
|
||||
logHandler.close();
|
||||
for (int i = 0; i < localLogDirs.length; i++) {
|
||||
FileUtils.deleteDirectory(localLogDirs[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static void testDeletionServiceNeverCall(DeletionService delService, String user,
|
||||
long timeout, Path... matchPaths) {
|
||||
long verifyStartTime = System.currentTimeMillis();
|
||||
WantedButNotInvoked notInvokedException = null;
|
||||
boolean matched = false;
|
||||
while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
|
||||
try {
|
||||
verify(delService, never()).delete(argThat(new FileDeletionMatcher(
|
||||
delService, user, null, Arrays.asList(matchPaths))));
|
||||
matched = true;
|
||||
} catch (WantedButNotInvoked e) {
|
||||
notInvokedException = e;
|
||||
try {
|
||||
Thread.sleep(50l);
|
||||
} catch (InterruptedException i) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!matched) {
|
||||
throw notInvokedException;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user