YARN-364. AggregatedLogDeletionService can take too long to delete logs. Contributed by Jason Lowe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1441239 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
71a57ded39
commit
f811198164
@ -325,6 +325,13 @@ Hadoop MapReduce Next Generation - Cluster Setup
|
|||||||
| | | How long to keep aggregation logs before deleting them. -1 disables. |
|
| | | How long to keep aggregation logs before deleting them. -1 disables. |
|
||||||
| | | Be careful, set this too small and you will spam the name node. |
|
| | | Be careful, set this too small and you will spam the name node. |
|
||||||
*-------------------------+-------------------------+------------------------+
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
| <<<yarn.log-aggregation.retain-check-interval-seconds>>> | | |
|
||||||
|
| | <-1> | |
|
||||||
|
| | | Time between checks for aggregated log retention. If set to 0 or a |
|
||||||
|
| | | negative value then the value is computed as one-tenth of the |
|
||||||
|
| | | aggregated log retention time. |
|
||||||
|
| | | Be careful, set this too small and you will spam the name node. |
|
||||||
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -284,6 +284,9 @@ Release 0.23.7 - UNRELEASED
|
|||||||
YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan
|
YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan
|
||||||
Gong via tgraves)
|
Gong via tgraves)
|
||||||
|
|
||||||
|
YARN-364. AggregatedLogDeletionService can take too long to delete logs
|
||||||
|
(jlowe)
|
||||||
|
|
||||||
Release 0.23.6 - UNRELEASED
|
Release 0.23.6 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -379,6 +379,15 @@ public class YarnConfiguration extends Configuration {
|
|||||||
+ "log-aggregation.retain-seconds";
|
+ "log-aggregation.retain-seconds";
|
||||||
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
|
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How long to wait between aggregated log retention checks. If set to
|
||||||
|
* a value <= 0 then the value is computed as one-tenth of the log retention
|
||||||
|
* setting. Be careful set this too small and you will spam the name node.
|
||||||
|
*/
|
||||||
|
public static final String LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS =
|
||||||
|
YARN_PREFIX + "log-aggregation.retain-check-interval-seconds";
|
||||||
|
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
|
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
|
||||||
* aggregation is disabled
|
* aggregation is disabled
|
||||||
|
@ -140,9 +140,16 @@ public void start() {
|
|||||||
" too small (" + retentionSecs + ")");
|
" too small (" + retentionSecs + ")");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
long checkIntervalMsecs = 1000 * conf.getLong(
|
||||||
|
YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||||
|
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
|
||||||
|
if (checkIntervalMsecs <= 0) {
|
||||||
|
// when unspecified compute check interval as 1/10th of retention
|
||||||
|
checkIntervalMsecs = (retentionSecs * 1000) / 10;
|
||||||
|
}
|
||||||
TimerTask task = new LogDeletionTask(conf, retentionSecs);
|
TimerTask task = new LogDeletionTask(conf, retentionSecs);
|
||||||
timer = new Timer();
|
timer = new Timer();
|
||||||
timer.scheduleAtFixedRate(task, 0, retentionSecs * 1000);
|
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,6 +410,15 @@
|
|||||||
<value>-1</value>
|
<value>-1</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>How long to wait between aggregated log retention checks.
|
||||||
|
If set to 0 or a negative value then the value is computed as one-tenth
|
||||||
|
of the aggregated log retention time. Be careful set this too small and
|
||||||
|
you will spam the name node.</description>
|
||||||
|
<name>yarn.log-aggregation.retain-check-interval-seconds</name>
|
||||||
|
<value>-1</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Time in seconds to retain user logs. Only applicable if
|
<description>Time in seconds to retain user logs. Only applicable if
|
||||||
log aggregation is disabled
|
log aggregation is disabled
|
||||||
|
@ -28,12 +28,19 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
public class TestAggregatedLogDeletionService {
|
public class TestAggregatedLogDeletionService {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void closeFilesystems() throws IOException {
|
||||||
|
// prevent the same mockfs instance from being reused due to FS cache
|
||||||
|
FileSystem.closeAll();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeletion() throws Exception {
|
public void testDeletion() throws Exception {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
@ -121,6 +128,70 @@ public void testDeletion() throws Exception {
|
|||||||
verify(mockFs).delete(app4Dir, true);
|
verify(mockFs).delete(app4Dir, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckInterval() throws Exception {
|
||||||
|
long RETENTION_SECS = 10 * 24 * 3600;
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
long toDeleteTime = now - RETENTION_SECS*1000;
|
||||||
|
|
||||||
|
String root = "mockfs://foo/";
|
||||||
|
String remoteRootLogDir = root+"tmp/logs";
|
||||||
|
String suffix = "logs";
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
|
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
||||||
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
|
||||||
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
|
||||||
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
||||||
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
||||||
|
|
||||||
|
// prevent us from picking up the same mockfs instance from another test
|
||||||
|
FileSystem.closeAll();
|
||||||
|
Path rootPath = new Path(root);
|
||||||
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||||
|
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||||
|
|
||||||
|
Path remoteRootLogPath = new Path(remoteRootLogDir);
|
||||||
|
|
||||||
|
Path userDir = new Path(remoteRootLogPath, "me");
|
||||||
|
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
|
||||||
|
|
||||||
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
||||||
|
new FileStatus[]{userDirStatus});
|
||||||
|
|
||||||
|
Path userLogDir = new Path(userDir, suffix);
|
||||||
|
Path app1Dir = new Path(userLogDir, "application_1_1");
|
||||||
|
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
|
||||||
|
|
||||||
|
when(mockFs.listStatus(userLogDir)).thenReturn(
|
||||||
|
new FileStatus[]{app1DirStatus});
|
||||||
|
|
||||||
|
Path app1Log1 = new Path(app1Dir, "host1");
|
||||||
|
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
|
||||||
|
|
||||||
|
when(mockFs.listStatus(app1Dir)).thenReturn(
|
||||||
|
new FileStatus[]{app1Log1Status});
|
||||||
|
|
||||||
|
AggregatedLogDeletionService deletionSvc =
|
||||||
|
new AggregatedLogDeletionService();
|
||||||
|
deletionSvc.init(conf);
|
||||||
|
deletionSvc.start();
|
||||||
|
|
||||||
|
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
|
||||||
|
verify(mockFs, never()).delete(app1Dir, true);
|
||||||
|
|
||||||
|
// modify the timestamp of the logs and verify it's picked up quickly
|
||||||
|
app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
|
||||||
|
app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
|
||||||
|
when(mockFs.listStatus(userLogDir)).thenReturn(
|
||||||
|
new FileStatus[]{app1DirStatus});
|
||||||
|
when(mockFs.listStatus(app1Dir)).thenReturn(
|
||||||
|
new FileStatus[]{app1Log1Status});
|
||||||
|
|
||||||
|
verify(mockFs, timeout(10000)).delete(app1Dir, true);
|
||||||
|
|
||||||
|
deletionSvc.stop();
|
||||||
|
}
|
||||||
|
|
||||||
static class MockFileSystem extends FilterFileSystem {
|
static class MockFileSystem extends FilterFileSystem {
|
||||||
MockFileSystem() {
|
MockFileSystem() {
|
||||||
|
Loading…
Reference in New Issue
Block a user