diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fcdab29a1e..beee09e06d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -149,6 +149,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5356. Ability to refresh aggregated log retention period and check interval (Ashwin Shankar via jlowe) + MAPREDUCE-5386. Ability to refresh history server job retention and job + cleaner settings (Ashwin Shankar via jlowe) + IMPROVEMENTS OPTIMIZATIONS diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 4f39794fb9..46abee8b89 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapreduce.jobhistory.JobSummary; @@ -61,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -416,7 +418,7 @@ synchronized Path getHistoryFile() { return historyFile; } - private synchronized void delete() throws IOException { + protected synchronized void delete() throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("deleting " + historyFile + " and " + confFile); } @@ -524,10 +526,7 @@ protected void serviceInit(Configuration conf) throws Exception { maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); - jobListCache = new JobListCache(conf.getInt( - JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, - JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), - maxHistoryAge); + jobListCache = createJobListCache(); serialNumberIndex = new SerialNumberIndex(conf.getInt( JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, @@ -544,6 +543,12 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } + protected JobListCache createJobListCache() { + return new JobListCache(conf.getInt( + JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, + JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); + } + private void mkdir(FileContext fc, Path path, FsPermission fsp) throws IOException { if (!fc.util().exists(path)) { @@ -656,18 +661,18 @@ private static List scanDirectory(Path path, FileContext fc, return jhStatusList; } - private static List scanDirectoryForHistoryFiles(Path path, + protected List scanDirectoryForHistoryFiles(Path path, FileContext fc) throws IOException { return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); } - + /** * Finds all history directories with a timestamp component by scanning the * filesystem. Used when the JobHistory server is started. * - * @return + * @return list of history directories */ - private List findTimestampedDirectories() throws IOException { + protected List findTimestampedDirectories() throws IOException { List fsList = JobHistoryUtils.localGlobber(doneDirFc, doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); return fsList; @@ -954,7 +959,7 @@ void clean() throws IOException { } } if (!halted) { - doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); + deleteDir(serialDir); removeDirectoryFromSerialNumberIndex(serialDir.getPath()); existingDoneSubdirs.remove(serialDir.getPath()); } else { @@ -962,6 +967,13 @@ void clean() throws IOException { } } } + + protected boolean deleteDir(FileStatus serialDir) + throws AccessControlException, FileNotFoundException, + UnsupportedFileSystemException, IOException { + return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); + } + // for test @VisibleForTesting void setMaxHistoryAge(long newValue){ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 2c1f3a26ff..a316159ce8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -71,7 +72,11 @@ public class JobHistory extends AbstractService implements HistoryContext { private HistoryStorage storage = null; private HistoryFileManager hsManager = null; - + ScheduledFuture futureHistoryCleaner = null; + + //History job cleaner interval + private long cleanerInterval; + @Override protected void serviceInit(Configuration conf) throws Exception { LOG.info("JobHistory Init"); @@ -84,7 +89,7 @@ protected void serviceInit(Configuration conf) throws Exception { JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); - hsManager = new HistoryFileManager(); + hsManager = createHistoryFileManager(); hsManager.init(conf); try { hsManager.initExisting(); @@ -103,6 +108,10 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } + protected HistoryFileManager createHistoryFileManager() { + return new HistoryFileManager(); + } + @Override protected void serviceStart() throws Exception { hsManager.start(); @@ -118,19 +127,14 @@ protected void serviceStart() throws Exception { moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS); // Start historyCleaner - boolean startCleanerService = conf.getBoolean( - JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); - if (startCleanerService) { - long runInterval = conf.getLong( - JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, - JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS); - scheduledExecutor - .scheduleAtFixedRate(new HistoryCleaner(), - 30 * 1000l, runInterval, TimeUnit.MILLISECONDS); - } + scheduleHistoryCleaner(); super.serviceStart(); } + protected int getInitDelaySecs() { + return 30; + } + @Override protected void serviceStop() throws Exception { LOG.info("Stopping JobHistory"); @@ -256,6 +260,43 @@ public JobsInfo getPartialJobs(Long offset, Long count, String user, fBegin, fEnd, jobState); } + public void refreshJobRetentionSettings() { + if (getServiceState() == STATE.STARTED) { + conf = createConf(); + long maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); + hsManager.setMaxHistoryAge(maxHistoryAge); + if (futureHistoryCleaner != null) { + futureHistoryCleaner.cancel(false); + } + futureHistoryCleaner = null; + scheduleHistoryCleaner(); + } else { + LOG.warn("Failed to execute refreshJobRetentionSettings : Job History service is not started"); + } + } + + private void scheduleHistoryCleaner() { + boolean startCleanerService = conf.getBoolean( + JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); + if (startCleanerService) { + cleanerInterval = conf.getLong( + JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS); + + futureHistoryCleaner = scheduledExecutor.scheduleAtFixedRate( + new HistoryCleaner(), getInitDelaySecs() * 1000l, cleanerInterval, + TimeUnit.MILLISECONDS); + } + } + + protected Configuration createConf() { + return new Configuration(); + } + + public long getCleanerInterval() { + return cleanerInterval; + } // TODO AppContext - Not Required private ApplicationAttemptId appAttemptID; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index 01d5b5d2ee..d6f509f144 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -83,7 +83,7 @@ protected void serviceInit(Configuration conf) throws Exception { clientService = new HistoryClientService(historyContext, this.jhsDTSecretManager); aggLogDelService = new AggregatedLogDeletionService(); - hsAdminServer = new HSAdminServer(aggLogDelService); + hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService); addService(jobHistoryService); addService(clientService); addService(aggLogDelService); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java index dc3c89663e..87baaf0e55 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java @@ -60,8 +60,12 @@ private static void printUsage(String cmd) { .println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]"); } else if ("-refreshAdminAcls".equals(cmd)) { System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]"); + } else if ("-refreshJobRetentionSettings".equals(cmd)) { + System.err + .println("Usage: mapred hsadmin [-refreshJobRetentionSettings]"); } else if ("-refreshLogRetentionSettings".equals(cmd)) { - System.err.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]"); + System.err + .println("Usage: mapred hsadmin [-refreshLogRetentionSettings]"); } else if ("-getGroups".equals(cmd)) { System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]"); } else { @@ -69,6 +73,7 @@ private static void printUsage(String cmd) { System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshAdminAcls]"); + System.err.println(" [-refreshJobRetentionSettings]"); System.err.println(" [-refreshLogRetentionSettings]"); System.err.println(" [-getGroups [username]]"); System.err.println(" [-help [cmd]]"); @@ -84,6 +89,8 @@ private static void printHelp(String cmd) { + " [-refreshUserToGroupsMappings]" + " [-refreshSuperUserGroupsConfiguration]" + " [-refreshAdminAcls]" + + " [-refreshLogRetentionSettings]" + + " [-refreshJobRetentionSettings]" + " [-getGroups [username]]" + " [-help [cmd]]\n"; String refreshUserToGroupsMappings = "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n"; @@ -92,8 +99,13 @@ private static void printHelp(String cmd) { String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n"; - String refreshLogRetentionSettings = "-refreshLogRetentionSettings: Refresh 'log retention time' and 'log retention check interval' \n"; + String refreshJobRetentionSettings = "-refreshJobRetentionSettings:" + + "Refresh job history period,job cleaner settings\n"; + + String refreshLogRetentionSettings = "-refreshLogRetentionSettings:" + + "Refresh log retention period and log retention check interval\n"; + String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n"; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" @@ -107,6 +119,8 @@ private static void printHelp(String cmd) { System.out.println(refreshSuperUserGroupsConfiguration); } else if ("refreshAdminAcls".equals(cmd)) { System.out.println(refreshAdminAcls); + } else if ("refreshJobRetentionSettings".equals(cmd)) { + System.out.println(refreshJobRetentionSettings); } else if ("refreshLogRetentionSettings".equals(cmd)) { System.out.println(refreshLogRetentionSettings); } else if ("getGroups".equals(cmd)) { @@ -116,6 +130,7 @@ private static void printHelp(String cmd) { System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshAdminAcls); + System.out.println(refreshJobRetentionSettings); System.out.println(refreshLogRetentionSettings); System.out.println(getGroups); System.out.println(help); @@ -201,11 +216,27 @@ private int refreshAdminAcls() throws IOException { HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf, address, HSAdminRefreshProtocol.class, UserGroupInformation.getCurrentUser()); - // Refresh the user-to-groups mappings + refreshProtocol.refreshAdminAcls(); return 0; } + private int refreshJobRetentionSettings() throws IOException { + // Refresh job retention settings + Configuration conf = getConf(); + InetSocketAddress address = conf.getSocketAddr( + JHAdminConfig.JHS_ADMIN_ADDRESS, + JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS, + JHAdminConfig.DEFAULT_JHS_ADMIN_PORT); + + HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf, + address, HSAdminRefreshProtocol.class, + UserGroupInformation.getCurrentUser()); + + refreshProtocol.refreshJobRetentionSettings(); + return 0; + } + private int refreshLogRetentionSettings() throws IOException { // Refresh log retention settings Configuration conf = getConf(); @@ -214,14 +245,14 @@ private int refreshLogRetentionSettings() throws IOException { JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS, JHAdminConfig.DEFAULT_JHS_ADMIN_PORT); - HSAdminRefreshProtocol refreshProtocol = HSProxies - .createProxy(conf, address, HSAdminRefreshProtocol.class, - UserGroupInformation.getCurrentUser()); + HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf, + address, HSAdminRefreshProtocol.class, + UserGroupInformation.getCurrentUser()); refreshProtocol.refreshLogRetentionSettings(); return 0; } - + @Override public int run(String[] args) throws Exception { if (args.length < 1) { @@ -236,6 +267,7 @@ public int run(String[] args) throws Exception { if ("-refreshUserToGroupsMappings".equals(cmd) || "-refreshSuperUserGroupsConfiguration".equals(cmd) || "-refreshAdminAcls".equals(cmd) + || "-refreshJobRetentionSettings".equals(cmd) || "-refreshLogRetentionSettings".equals(cmd)) { if (args.length != 1) { printUsage(cmd); @@ -250,6 +282,8 @@ public int run(String[] args) throws Exception { exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-refreshAdminAcls".equals(cmd)) { exitCode = refreshAdminAcls(); + } else if ("-refreshJobRetentionSettings".equals(cmd)) { + exitCode = refreshJobRetentionSettings(); } else if ("-refreshLogRetentionSettings".equals(cmd)) { exitCode = refreshLogRetentionSettings(); } else if ("-getGroups".equals(cmd)) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java index ed661d1bf3..ea6b15cab0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java @@ -39,6 +39,13 @@ public interface HSAdminRefreshProtocol { * @throws IOException */ public void refreshAdminAcls() throws IOException; + + /** + * Refresh job retention settings. + * + * @throws IOException + */ + public void refreshJobRetentionSettings() throws IOException; /** * Refresh log retention settings. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java index e210b2b7d4..3d39a1b152 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; @@ -42,12 +43,18 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements private final HSAdminRefreshProtocolPB rpcProxy; - private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto + private final static RefreshAdminAclsRequestProto + VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto .newBuilder().build(); - private final static RefreshLogRetentionSettingsRequestProto VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = RefreshLogRetentionSettingsRequestProto - .newBuilder().build(); + private final static RefreshJobRetentionSettingsRequestProto + VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST = + RefreshJobRetentionSettingsRequestProto.newBuilder().build(); + private final static RefreshLogRetentionSettingsRequestProto + VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = + RefreshLogRetentionSettingsRequestProto.newBuilder().build(); + public HSAdminRefreshProtocolClientSideTranslatorPB( HSAdminRefreshProtocolPB rpcProxy) { this.rpcProxy = rpcProxy; @@ -68,6 +75,16 @@ public void refreshAdminAcls() throws IOException { } } + @Override + public void refreshJobRetentionSettings() throws IOException { + try { + rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER, + VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + } + @Override public void refreshLogRetentionSettings() throws IOException { try { @@ -77,7 +94,7 @@ public void refreshLogRetentionSettings() throws IOException { throw ProtobufHelper.getRemoteException(se); } } - + @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java index eaf395cb13..a59b4a8ded 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; @@ -36,10 +38,17 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements private final HSAdminRefreshProtocol impl; - private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto - .newBuilder().build(); - private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto + private final static RefreshAdminAclsResponseProto + VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto .newBuilder().build(); + + private final static RefreshJobRetentionSettingsResponseProto + VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE = + RefreshJobRetentionSettingsResponseProto.newBuilder().build(); + + private final static RefreshLogRetentionSettingsResponseProto + VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = + RefreshLogRetentionSettingsResponseProto.newBuilder().build(); public HSAdminRefreshProtocolServerSideTranslatorPB( HSAdminRefreshProtocol impl) { @@ -58,9 +67,23 @@ public RefreshAdminAclsResponseProto refreshAdminAcls( return VOID_REFRESH_ADMIN_ACLS_RESPONSE; } + @Override + public RefreshJobRetentionSettingsResponseProto refreshJobRetentionSettings( + RpcController controller, + RefreshJobRetentionSettingsRequestProto request) + throws ServiceException { + try { + impl.refreshJobRetentionSettings(); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE; + } + @Override public RefreshLogRetentionSettingsResponseProto refreshLogRetentionSettings( - RpcController controller, RefreshLogRetentionSettingsRequestProto request) + RpcController controller, + RefreshLogRetentionSettingsRequestProto request) throws ServiceException { try { impl.refreshLogRetentionSettings(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index 444a8617d4..ad8e7201a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger; import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger.AuditConstants; +import org.apache.hadoop.mapreduce.v2.hs.JobHistory; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminProtocol; import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolPB; @@ -62,10 +63,13 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { protected RPC.Server clientRpcServer; protected InetSocketAddress clientRpcAddress; private static final String HISTORY_ADMIN_SERVER = "HSAdminServer"; - - public HSAdminServer(AggregatedLogDeletionService aggLogDelService) { + private JobHistory jobHistoryService = null; + + public HSAdminServer(AggregatedLogDeletionService aggLogDelService, + JobHistory jobHistoryService) { super(HSAdminServer.class.getName()); this.aggLogDelService = aggLogDelService; + this.jobHistoryService = jobHistoryService; } @Override @@ -100,7 +104,8 @@ public void serviceInit(Configuration conf) throws Exception { .setPort(clientRpcAddress.getPort()).setVerbose(false).build(); addProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService); - addProtocol(conf, HSAdminRefreshProtocolPB.class, refreshHSAdminProtocolService); + addProtocol(conf, HSAdminRefreshProtocolPB.class, + refreshHSAdminProtocolService); adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL, JHAdminConfig.DEFAULT_JHS_ADMIN_ACL)); @@ -196,7 +201,7 @@ public void refreshAdminAcls() throws IOException { HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls", HISTORY_ADMIN_SERVER); } - + @Override public void refreshLogRetentionSettings() throws IOException { UserGroupInformation user = checkAcls("refreshLogRetentionSettings"); @@ -206,4 +211,14 @@ public void refreshLogRetentionSettings() throws IOException { HSAuditLogger.logSuccess(user.getShortUserName(), "refreshLogRetentionSettings", "HSAdminServer"); } + + @Override + public void refreshJobRetentionSettings() throws IOException { + UserGroupInformation user = checkAcls("refreshJobRetentionSettings"); + + jobHistoryService.refreshJobRetentionSettings(); + + HSAuditLogger.logSuccess(user.getShortUserName(), + "refreshJobRetentionSettings", HISTORY_ADMIN_SERVER); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto index e9d2f30689..eb968e7920 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto @@ -34,6 +34,18 @@ message RefreshAdminAclsResponseProto { } /** + * refresh job retention settings request. + */ +message RefreshJobRetentionSettingsRequestProto { +} + +/** + * Response for refresh job retention. + */ +message RefreshJobRetentionSettingsResponseProto { +} + +/* * refresh log retention request. */ message RefreshLogRetentionSettingsRequestProto { @@ -54,6 +66,13 @@ service HSAdminRefreshProtocolService { */ rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns(RefreshAdminAclsResponseProto); + + /** + * Refresh job retention. + */ + rpc refreshJobRetentionSettings(RefreshJobRetentionSettingsRequestProto) + returns(RefreshJobRetentionSettingsResponseProto); + /** * Refresh log retention */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java new file mode 100644 index 0000000000..365afd66f6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class TestJobHistory { + + JobHistory jobHistory = null; + + @Test + public void testRefreshJobRetentionSettings() throws IOException, + InterruptedException { + String root = "mockfs://foo/"; + String historyDoneDir = root + "mapred/history/done"; + + long now = System.currentTimeMillis(); + long someTimeYesterday = now - (25l * 3600 * 1000); + long timeBefore200Secs = now - (200l * 1000); + + // Get yesterday's date in YY/MM/DD format + String timestampComponent = JobHistoryUtils + .timestampDirectoryComponent(someTimeYesterday); + + // Create a folder under yesterday's done dir + Path donePathYesterday = new Path(historyDoneDir, timestampComponent + "/" + + "000000"); + FileStatus dirCreatedYesterdayStatus = new FileStatus(0, true, 0, 0, + someTimeYesterday, donePathYesterday); + + // Get today's date in YY/MM/DD format + timestampComponent = JobHistoryUtils + .timestampDirectoryComponent(timeBefore200Secs); + + // Create a folder under today's done dir + Path donePathToday = new Path(historyDoneDir, timestampComponent + "/" + + "000000"); + FileStatus dirCreatedTodayStatus = new FileStatus(0, true, 0, 0, + timeBefore200Secs, donePathToday); + + // Create a jhist file with yesterday's timestamp under yesterday's done dir + Path fileUnderYesterdayDir = new Path(donePathYesterday.toString(), + "job_1372363578825_0015-" + someTimeYesterday + "-user-Sleep+job-" + + someTimeYesterday + "-1-1-SUCCEEDED-default.jhist"); + FileStatus fileUnderYesterdayDirStatus = new FileStatus(10, false, 0, 0, + someTimeYesterday, fileUnderYesterdayDir); + + // Create a jhist file with today's timestamp under today's done dir + Path fileUnderTodayDir = new Path(donePathYesterday.toString(), + "job_1372363578825_0016-" + timeBefore200Secs + "-user-Sleep+job-" + + timeBefore200Secs + "-1-1-SUCCEEDED-default.jhist"); + FileStatus fileUnderTodayDirStatus = new FileStatus(10, false, 0, 0, + timeBefore200Secs, fileUnderTodayDir); + + HistoryFileManager historyManager = spy(new HistoryFileManager()); + jobHistory = spy(new JobHistory()); + + List fileStatusList = new LinkedList(); + fileStatusList.add(dirCreatedYesterdayStatus); + fileStatusList.add(dirCreatedTodayStatus); + + // Make the initial delay of history job cleaner as 4 secs + doReturn(4).when(jobHistory).getInitDelaySecs(); + doReturn(historyManager).when(jobHistory).createHistoryFileManager(); + + List list1 = new LinkedList(); + list1.add(fileUnderYesterdayDirStatus); + doReturn(list1).when(historyManager).scanDirectoryForHistoryFiles( + eq(donePathYesterday), any(FileContext.class)); + + List list2 = new LinkedList(); + list2.add(fileUnderTodayDirStatus); + doReturn(list2).when(historyManager).scanDirectoryForHistoryFiles( + eq(donePathToday), any(FileContext.class)); + + doReturn(fileStatusList).when(historyManager).findTimestampedDirectories(); + doReturn(true).when(historyManager).deleteDir(any(FileStatus.class)); + + JobListCache jobListCache = mock(JobListCache.class); + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + doReturn(jobListCache).when(historyManager).createJobListCache(); + when(jobListCache.get(any(JobId.class))).thenReturn(fileInfo); + + doNothing().when(fileInfo).delete(); + + // Set job retention time to 24 hrs and cleaner interval to 2 secs + Configuration conf = new Configuration(); + conf.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 24l * 3600 * 1000); + conf.setLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, 2 * 1000); + + jobHistory.init(conf); + + jobHistory.start(); + + assertEquals(2 * 1000l, jobHistory.getCleanerInterval()); + + // Only yesterday's jhist file should get deleted + verify(fileInfo, timeout(20000).times(1)).delete(); + + fileStatusList.remove(dirCreatedYesterdayStatus); + // Now reset job retention time to 10 secs + conf.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 10 * 1000); + // Set cleaner interval to 1 sec + conf.setLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, 1 * 1000); + + doReturn(conf).when(jobHistory).createConf(); + // Do refresh job retention settings + jobHistory.refreshJobRetentionSettings(); + + // Cleaner interval should be updated + assertEquals(1 * 1000l, jobHistory.getCleanerInterval()); + // Today's jhist file will also be deleted now since it falls below the + // retention threshold + verify(fileInfo, timeout(20000).times(2)).delete(); + } + + @After + public void cleanUp() { + if (jobHistory != null) { + jobHistory.stop(); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java index c530c19497..2474f575d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.mapreduce.v2.hs.JobHistory; import org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.security.GroupMappingServiceProvider; @@ -49,6 +50,7 @@ public class TestHSAdminServer { private HSAdmin hsAdminClient = null; Configuration conf = null; private static long groupRefreshTimeoutSec = 1; + JobHistory jobHistoryService = null; AggregatedLogDeletionService alds = null; public static class MockUnixGroupsMapping implements @@ -85,9 +87,11 @@ public void init() throws HadoopIllegalArgumentException, IOException { GroupMappingServiceProvider.class); conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec); Groups.getUserToGroupsMappingService(conf); + jobHistoryService = mock(JobHistory.class); alds = mock(AggregatedLogDeletionService.class); - hsAdminServer = new HSAdminServer(alds) { + hsAdminServer = new HSAdminServer(alds, jobHistoryService) { + @Override protected Configuration createConf() { return conf; @@ -236,13 +240,21 @@ public void testRefreshAdminAcls() throws Exception { } assertTrue(th instanceof RemoteException); } - + @Test public void testRefreshLogRetentionSettings() throws Exception { - String[] args = new String[1]; - args[0] = "-refreshLogRetentionSettings"; - hsAdminClient.run(args); - verify(alds).refreshLogRetentionSettings(); + String[] args = new String[1]; + args[0] = "-refreshLogRetentionSettings"; + hsAdminClient.run(args); + verify(alds).refreshLogRetentionSettings(); + } + + @Test + public void testRefreshJobRetentionSettings() throws Exception { + String[] args = new String[1]; + args[0] = "-refreshJobRetentionSettings"; + hsAdminClient.run(args); + verify(jobHistoryService).refreshJobRetentionSettings(); } @After