MAPREDUCE-5356. Ability to refresh aggregated log retention period and check interval. Contributed by Ashwin Shankar

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1506226 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-07-23 19:20:57 +00:00
parent 0939c49368
commit a42e459b9e
11 changed files with 271 additions and 33 deletions

View File

@ -146,6 +146,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5265. History server admin service to refresh user and superuser MAPREDUCE-5265. History server admin service to refresh user and superuser
group mappings (Ashwin Shankar via jlowe) group mappings (Ashwin Shankar via jlowe)
MAPREDUCE-5356. Ability to refresh aggregated log retention period and
check interval (Ashwin Shankar via jlowe)
IMPROVEMENTS IMPROVEMENTS
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -83,7 +83,7 @@ protected void serviceInit(Configuration conf) throws Exception {
clientService = new HistoryClientService(historyContext, clientService = new HistoryClientService(historyContext,
this.jhsDTSecretManager); this.jhsDTSecretManager);
aggLogDelService = new AggregatedLogDeletionService(); aggLogDelService = new AggregatedLogDeletionService();
hsAdminServer = new HSAdminServer(); hsAdminServer = new HSAdminServer(aggLogDelService);
addService(jobHistoryService); addService(jobHistoryService);
addService(clientService); addService(clientService);
addService(aggLogDelService); addService(aggLogDelService);

View File

@ -60,6 +60,8 @@ private static void printUsage(String cmd) {
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]"); .println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
} else if ("-refreshAdminAcls".equals(cmd)) { } else if ("-refreshAdminAcls".equals(cmd)) {
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]"); System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
System.err.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]");
} else if ("-getGroups".equals(cmd)) { } else if ("-getGroups".equals(cmd)) {
System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]"); System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]");
} else { } else {
@ -67,6 +69,7 @@ private static void printUsage(String cmd) {
System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]"); System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshLogRetentionSettings]");
System.err.println(" [-getGroups [username]]"); System.err.println(" [-getGroups [username]]");
System.err.println(" [-help [cmd]]"); System.err.println(" [-help [cmd]]");
System.err.println(); System.err.println();
@ -89,6 +92,8 @@ private static void printHelp(String cmd) {
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n"; String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
String refreshLogRetentionSettings = "-refreshLogRetentionSettings: Refresh 'log retention time' and 'log retention check interval' \n";
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n"; String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
@ -102,6 +107,8 @@ private static void printHelp(String cmd) {
System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshSuperUserGroupsConfiguration);
} else if ("refreshAdminAcls".equals(cmd)) { } else if ("refreshAdminAcls".equals(cmd)) {
System.out.println(refreshAdminAcls); System.out.println(refreshAdminAcls);
} else if ("refreshLogRetentionSettings".equals(cmd)) {
System.out.println(refreshLogRetentionSettings);
} else if ("getGroups".equals(cmd)) { } else if ("getGroups".equals(cmd)) {
System.out.println(getGroups); System.out.println(getGroups);
} else { } else {
@ -109,6 +116,7 @@ private static void printHelp(String cmd) {
System.out.println(refreshUserToGroupsMappings); System.out.println(refreshUserToGroupsMappings);
System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls); System.out.println(refreshAdminAcls);
System.out.println(refreshLogRetentionSettings);
System.out.println(getGroups); System.out.println(getGroups);
System.out.println(help); System.out.println(help);
System.out.println(); System.out.println();
@ -198,6 +206,22 @@ private int refreshAdminAcls() throws IOException {
return 0; return 0;
} }
private int refreshLogRetentionSettings() throws IOException {
// Refresh log retention settings
Configuration conf = getConf();
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
HSAdminRefreshProtocol refreshProtocol = HSProxies
.createProxy(conf, address, HSAdminRefreshProtocol.class,
UserGroupInformation.getCurrentUser());
refreshProtocol.refreshLogRetentionSettings();
return 0;
}
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
@ -211,7 +235,8 @@ public int run(String[] args) throws Exception {
if ("-refreshUserToGroupsMappings".equals(cmd) if ("-refreshUserToGroupsMappings".equals(cmd)
|| "-refreshSuperUserGroupsConfiguration".equals(cmd) || "-refreshSuperUserGroupsConfiguration".equals(cmd)
|| "-refreshAdminAcls".equals(cmd)) { || "-refreshAdminAcls".equals(cmd)
|| "-refreshLogRetentionSettings".equals(cmd)) {
if (args.length != 1) { if (args.length != 1) {
printUsage(cmd); printUsage(cmd);
return exitCode; return exitCode;
@ -225,6 +250,8 @@ public int run(String[] args) throws Exception {
exitCode = refreshSuperUserGroupsConfiguration(); exitCode = refreshSuperUserGroupsConfiguration();
} else if ("-refreshAdminAcls".equals(cmd)) { } else if ("-refreshAdminAcls".equals(cmd)) {
exitCode = refreshAdminAcls(); exitCode = refreshAdminAcls();
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
exitCode = refreshLogRetentionSettings();
} else if ("-getGroups".equals(cmd)) { } else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length); String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames); exitCode = getGroups(usernames);

View File

@ -40,4 +40,11 @@ public interface HSAdminRefreshProtocol {
*/ */
public void refreshAdminAcls() throws IOException; public void refreshAdminAcls() throws IOException;
/**
* Refresh log retention settings.
*
* @throws IOException
*/
public void refreshLogRetentionSettings() throws IOException;
} }

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -44,6 +45,9 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
.newBuilder().build(); .newBuilder().build();
private final static RefreshLogRetentionSettingsRequestProto VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = RefreshLogRetentionSettingsRequestProto
.newBuilder().build();
public HSAdminRefreshProtocolClientSideTranslatorPB( public HSAdminRefreshProtocolClientSideTranslatorPB(
HSAdminRefreshProtocolPB rpcProxy) { HSAdminRefreshProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy; this.rpcProxy = rpcProxy;
@ -64,6 +68,16 @@ public void refreshAdminAcls() throws IOException {
} }
} }
@Override
public void refreshLogRetentionSettings() throws IOException {
try {
rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER,
VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, return RpcClientUtil.isMethodSupported(rpcProxy,

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -36,6 +38,8 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
.newBuilder().build(); .newBuilder().build();
private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto
.newBuilder().build();
public HSAdminRefreshProtocolServerSideTranslatorPB( public HSAdminRefreshProtocolServerSideTranslatorPB(
HSAdminRefreshProtocol impl) { HSAdminRefreshProtocol impl) {
@ -54,4 +58,15 @@ public RefreshAdminAclsResponseProto refreshAdminAcls(
return VOID_REFRESH_ADMIN_ACLS_RESPONSE; return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
} }
@Override
public RefreshLogRetentionSettingsResponseProto refreshLogRetentionSettings(
RpcController controller, RefreshLogRetentionSettingsRequestProto request)
throws ServiceException {
try {
impl.refreshLogRetentionSettings();
} catch (IOException e) {
throw new ServiceException(e);
}
return VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE;
}
} }

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService; import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
@ -55,14 +56,16 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
private static final Log LOG = LogFactory.getLog(HSAdminServer.class); private static final Log LOG = LogFactory.getLog(HSAdminServer.class);
private AccessControlList adminAcl; private AccessControlList adminAcl;
private AggregatedLogDeletionService aggLogDelService = null;
/** The RPC server that listens to requests from clients */ /** The RPC server that listens to requests from clients */
protected RPC.Server clientRpcServer; protected RPC.Server clientRpcServer;
protected InetSocketAddress clientRpcAddress; protected InetSocketAddress clientRpcAddress;
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer"; private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
public HSAdminServer() { public HSAdminServer(AggregatedLogDeletionService aggLogDelService) {
super(HSAdminServer.class.getName()); super(HSAdminServer.class.getName());
this.aggLogDelService = aggLogDelService;
} }
@Override @Override
@ -101,6 +104,7 @@ public void serviceInit(Configuration conf) throws Exception {
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL, adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL)); JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
} }
@Override @Override
@ -193,4 +197,13 @@ public void refreshAdminAcls() throws IOException {
HISTORY_ADMIN_SERVER); HISTORY_ADMIN_SERVER);
} }
@Override
public void refreshLogRetentionSettings() throws IOException {
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
aggLogDelService.refreshLogRetentionSettings();
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshLogRetentionSettings", "HSAdminServer");
}
} }

View File

@ -33,6 +33,18 @@ message RefreshAdminAclsRequestProto {
message RefreshAdminAclsResponseProto { message RefreshAdminAclsResponseProto {
} }
/**
* refresh log retention request.
*/
message RefreshLogRetentionSettingsRequestProto {
}
/**
* Response for refresh log retention.
*/
message RefreshLogRetentionSettingsResponseProto {
}
/** /**
* Refresh Protocols implemented by the History server * Refresh Protocols implemented by the History server
*/ */
@ -42,4 +54,9 @@ service HSAdminRefreshProtocolService {
*/ */
rpc refreshAdminAcls(RefreshAdminAclsRequestProto) rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
returns(RefreshAdminAclsResponseProto); returns(RefreshAdminAclsResponseProto);
/**
* Refresh log retention
*/
rpc refreshLogRetentionSettings(RefreshLogRetentionSettingsRequestProto)
returns(RefreshLogRetentionSettingsResponseProto);
} }

View File

@ -39,14 +39,17 @@
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
public class TestHSAdminServer { public class TestHSAdminServer {
private HSAdminServer hsAdminServer = null; private HSAdminServer hsAdminServer = null;
private HSAdmin hsAdminClient = null; private HSAdmin hsAdminClient = null;
Configuration conf = null; Configuration conf = null;
private static long groupRefreshTimeoutSec = 1; private static long groupRefreshTimeoutSec = 1;
AggregatedLogDeletionService alds = null;
public static class MockUnixGroupsMapping implements public static class MockUnixGroupsMapping implements
GroupMappingServiceProvider { GroupMappingServiceProvider {
@ -82,7 +85,9 @@ public void init() throws HadoopIllegalArgumentException, IOException {
GroupMappingServiceProvider.class); GroupMappingServiceProvider.class);
conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec); conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec);
Groups.getUserToGroupsMappingService(conf); Groups.getUserToGroupsMappingService(conf);
hsAdminServer = new HSAdminServer() { alds = mock(AggregatedLogDeletionService.class);
hsAdminServer = new HSAdminServer(alds) {
@Override @Override
protected Configuration createConf() { protected Configuration createConf() {
return conf; return conf;
@ -232,6 +237,14 @@ public void testRefreshAdminAcls() throws Exception {
assertTrue(th instanceof RemoteException); assertTrue(th instanceof RemoteException);
} }
@Test
public void testRefreshLogRetentionSettings() throws Exception {
String[] args = new String[1];
args[0] = "-refreshLogRetentionSettings";
hsAdminClient.run(args);
verify(alds).refreshLogRetentionSettings();
}
@After @After
public void cleanUp() { public void cleanUp() {
if (hsAdminServer != null) if (hsAdminServer != null)

View File

@ -41,6 +41,7 @@ public class AggregatedLogDeletionService extends AbstractService {
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null; private Timer timer = null;
private long checkIntervalMsecs;
static class LogDeletionTask extends TimerTask { static class LogDeletionTask extends TimerTask {
private Configuration conf; private Configuration conf;
@ -133,37 +134,71 @@ public AggregatedLogDeletionService() {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
Configuration conf = getConfig(); scheduleLogDeletionTask();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
//Log aggregation is not enabled so don't bother
return;
}
long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
if(retentionSecs < 0) {
LOG.info("Log Aggregation deletion is disabled because retention is" +
" too small (" + retentionSecs + ")");
return;
}
long checkIntervalMsecs = 1000 * conf.getLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
if (checkIntervalMsecs <= 0) {
// when unspecified compute check interval as 1/10th of retention
checkIntervalMsecs = (retentionSecs * 1000) / 10;
}
TimerTask task = new LogDeletionTask(conf, retentionSecs);
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
super.serviceStart(); super.serviceStart();
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if(timer != null) { stopTimer();
timer.cancel();
}
super.serviceStop(); super.serviceStop();
} }
private void setLogAggCheckIntervalMsecs(long retentionSecs) {
Configuration conf = getConfig();
checkIntervalMsecs = 1000 * conf
.getLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
if (checkIntervalMsecs <= 0) {
// when unspecified compute check interval as 1/10th of retention
checkIntervalMsecs = (retentionSecs * 1000) / 10;
}
}
public void refreshLogRetentionSettings() {
if (getServiceState() == STATE.STARTED) {
Configuration conf = createConf();
setConfig(conf);
stopTimer();
scheduleLogDeletionTask();
} else {
LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started");
}
}
private void scheduleLogDeletionTask() {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
// Log aggregation is not enabled so don't bother
return;
}
long retentionSecs = conf.getLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
if (retentionSecs < 0) {
LOG.info("Log Aggregation deletion is disabled because retention is"
+ " too small (" + retentionSecs + ")");
return;
}
setLogAggCheckIntervalMsecs(retentionSecs);
TimerTask task = new LogDeletionTask(conf, retentionSecs);
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
}
private void stopTimer() {
if (timer != null) {
timer.cancel();
}
}
public long getCheckIntervalMsecs() {
return checkIntervalMsecs;
}
protected Configuration createConf() {
return new Configuration();
}
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.Assert;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -128,6 +129,99 @@ public void testDeletion() throws Exception {
verify(mockFs).delete(app4Dir, true); verify(mockFs).delete(app4Dir, true);
} }
@Test
public void testRefreshLogRetentionSettings() throws IOException {
long now = System.currentTimeMillis();
//time before 2000 sec
long before2000Secs = now - (2000 * 1000);
//time before 50 sec
long before50Secs = now - (50 * 1000);
String root = "mockfs://foo/";
String remoteRootLogDir = root + "tmp/logs";
String suffix = "logs";
final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
"1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
Path remoteRootLogPath = new Path(remoteRootLogDir);
Path userDir = new Path(remoteRootLogPath, "me");
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
userDir);
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[] { userDirStatus });
Path userLogDir = new Path(userDir, suffix);
//Set time last modified of app1Dir directory and its files to before2000Secs
Path app1Dir = new Path(userLogDir, "application_1_1");
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir);
//Set time last modified of app1Dir directory and its files to before50Secs
Path app2Dir = new Path(userLogDir, "application_1_2");
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[] { app1DirStatus, app2DirStatus });
Path app1Log1 = new Path(app1Dir, "host1");
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
app1Log1);
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[] { app1Log1Status });
Path app2Log1 = new Path(app2Dir, "host1");
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
app2Log1);
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[] { app2Log1Status });
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
@Override
protected Configuration createConf() {
return conf;
}
};
deletionSvc.init(conf);
deletionSvc.start();
//app1Dir would be deleted since its done above log retention period
verify(mockFs, timeout(10000)).delete(app1Dir, true);
//app2Dir is not expected to be deleted since its below the threshold
verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
//Now,lets change the confs
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
"2");
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
//refresh the log settings
deletionSvc.refreshLogRetentionSettings();
//Check interval time should reflect the new value
Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
//app2Dir should be deleted since it falls above the threshold
verify(mockFs, timeout(10000)).delete(app2Dir, true);
deletionSvc.stop();
}
@Test @Test
public void testCheckInterval() throws Exception { public void testCheckInterval() throws Exception {
long RETENTION_SECS = 10 * 24 * 3600; long RETENTION_SECS = 10 * 24 * 3600;