MAPREDUCE-5386. Ability to refresh history server job retention and job cleaner settings. Contributed by Ashwin Shankar
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1507135 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b9b2647ebc
commit
8fa3ebd134
@ -149,6 +149,9 @@ Release 2.3.0 - UNRELEASED
|
||||
MAPREDUCE-5356. Ability to refresh aggregated log retention period and
|
||||
check interval (Ashwin Shankar via jlowe)
|
||||
|
||||
MAPREDUCE-5386. Ability to refresh history server job retention and job
|
||||
cleaner settings (Ashwin Shankar via jlowe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.mapred.JobACLsManager;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
|
||||
@ -61,6 +62,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
@ -416,7 +418,7 @@ synchronized Path getHistoryFile() {
|
||||
return historyFile;
|
||||
}
|
||||
|
||||
private synchronized void delete() throws IOException {
|
||||
protected synchronized void delete() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("deleting " + historyFile + " and " + confFile);
|
||||
}
|
||||
@ -524,10 +526,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
|
||||
|
||||
jobListCache = new JobListCache(conf.getInt(
|
||||
JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
|
||||
maxHistoryAge);
|
||||
jobListCache = createJobListCache();
|
||||
|
||||
serialNumberIndex = new SerialNumberIndex(conf.getInt(
|
||||
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
|
||||
@ -544,6 +543,12 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
protected JobListCache createJobListCache() {
|
||||
return new JobListCache(conf.getInt(
|
||||
JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
|
||||
}
|
||||
|
||||
private void mkdir(FileContext fc, Path path, FsPermission fsp)
|
||||
throws IOException {
|
||||
if (!fc.util().exists(path)) {
|
||||
@ -656,18 +661,18 @@ private static List<FileStatus> scanDirectory(Path path, FileContext fc,
|
||||
return jhStatusList;
|
||||
}
|
||||
|
||||
private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
|
||||
protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
|
||||
FileContext fc) throws IOException {
|
||||
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Finds all history directories with a timestamp component by scanning the
|
||||
* filesystem. Used when the JobHistory server is started.
|
||||
*
|
||||
* @return
|
||||
* @return list of history directories
|
||||
*/
|
||||
private List<FileStatus> findTimestampedDirectories() throws IOException {
|
||||
protected List<FileStatus> findTimestampedDirectories() throws IOException {
|
||||
List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
|
||||
doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
|
||||
return fsList;
|
||||
@ -954,7 +959,7 @@ void clean() throws IOException {
|
||||
}
|
||||
}
|
||||
if (!halted) {
|
||||
doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
|
||||
deleteDir(serialDir);
|
||||
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
|
||||
existingDoneSubdirs.remove(serialDir.getPath());
|
||||
} else {
|
||||
@ -962,6 +967,13 @@ void clean() throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean deleteDir(FileStatus serialDir)
|
||||
throws AccessControlException, FileNotFoundException,
|
||||
UnsupportedFileSystemException, IOException {
|
||||
return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
|
||||
}
|
||||
|
||||
// for test
|
||||
@VisibleForTesting
|
||||
void setMaxHistoryAge(long newValue){
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
@ -71,7 +72,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
||||
|
||||
private HistoryStorage storage = null;
|
||||
private HistoryFileManager hsManager = null;
|
||||
|
||||
ScheduledFuture<?> futureHistoryCleaner = null;
|
||||
|
||||
//History job cleaner interval
|
||||
private long cleanerInterval;
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
LOG.info("JobHistory Init");
|
||||
@ -84,7 +89,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
|
||||
|
||||
hsManager = new HistoryFileManager();
|
||||
hsManager = createHistoryFileManager();
|
||||
hsManager.init(conf);
|
||||
try {
|
||||
hsManager.initExisting();
|
||||
@ -103,6 +108,10 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
protected HistoryFileManager createHistoryFileManager() {
|
||||
return new HistoryFileManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
hsManager.start();
|
||||
@ -118,19 +127,14 @@ protected void serviceStart() throws Exception {
|
||||
moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Start historyCleaner
|
||||
boolean startCleanerService = conf.getBoolean(
|
||||
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
|
||||
if (startCleanerService) {
|
||||
long runInterval = conf.getLong(
|
||||
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
|
||||
scheduledExecutor
|
||||
.scheduleAtFixedRate(new HistoryCleaner(),
|
||||
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
scheduleHistoryCleaner();
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
protected int getInitDelaySecs() {
|
||||
return 30;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Stopping JobHistory");
|
||||
@ -256,6 +260,43 @@ public JobsInfo getPartialJobs(Long offset, Long count, String user,
|
||||
fBegin, fEnd, jobState);
|
||||
}
|
||||
|
||||
public void refreshJobRetentionSettings() {
|
||||
if (getServiceState() == STATE.STARTED) {
|
||||
conf = createConf();
|
||||
long maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
|
||||
hsManager.setMaxHistoryAge(maxHistoryAge);
|
||||
if (futureHistoryCleaner != null) {
|
||||
futureHistoryCleaner.cancel(false);
|
||||
}
|
||||
futureHistoryCleaner = null;
|
||||
scheduleHistoryCleaner();
|
||||
} else {
|
||||
LOG.warn("Failed to execute refreshJobRetentionSettings : Job History service is not started");
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleHistoryCleaner() {
|
||||
boolean startCleanerService = conf.getBoolean(
|
||||
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
|
||||
if (startCleanerService) {
|
||||
cleanerInterval = conf.getLong(
|
||||
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
|
||||
|
||||
futureHistoryCleaner = scheduledExecutor.scheduleAtFixedRate(
|
||||
new HistoryCleaner(), getInitDelaySecs() * 1000l, cleanerInterval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
protected Configuration createConf() {
|
||||
return new Configuration();
|
||||
}
|
||||
|
||||
public long getCleanerInterval() {
|
||||
return cleanerInterval;
|
||||
}
|
||||
// TODO AppContext - Not Required
|
||||
private ApplicationAttemptId appAttemptID;
|
||||
|
||||
|
@ -83,7 +83,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
clientService = new HistoryClientService(historyContext,
|
||||
this.jhsDTSecretManager);
|
||||
aggLogDelService = new AggregatedLogDeletionService();
|
||||
hsAdminServer = new HSAdminServer(aggLogDelService);
|
||||
hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
|
||||
addService(jobHistoryService);
|
||||
addService(clientService);
|
||||
addService(aggLogDelService);
|
||||
|
@ -60,8 +60,12 @@ private static void printUsage(String cmd) {
|
||||
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
|
||||
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
|
||||
System.err
|
||||
.println("Usage: mapred hsadmin [-refreshJobRetentionSettings]");
|
||||
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]");
|
||||
System.err
|
||||
.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]");
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]");
|
||||
} else {
|
||||
@ -69,6 +73,7 @@ private static void printUsage(String cmd) {
|
||||
System.err.println(" [-refreshUserToGroupsMappings]");
|
||||
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
|
||||
System.err.println(" [-refreshAdminAcls]");
|
||||
System.err.println(" [-refreshJobRetentionSettings]");
|
||||
System.err.println(" [-refreshLogRetentionSettings]");
|
||||
System.err.println(" [-getGroups [username]]");
|
||||
System.err.println(" [-help [cmd]]");
|
||||
@ -84,6 +89,8 @@ private static void printHelp(String cmd) {
|
||||
+ " [-refreshUserToGroupsMappings]"
|
||||
+ " [-refreshSuperUserGroupsConfiguration]"
|
||||
+ " [-refreshAdminAcls]"
|
||||
+ " [-refreshLogRetentionSettings]"
|
||||
+ " [-refreshJobRetentionSettings]"
|
||||
+ " [-getGroups [username]]" + " [-help [cmd]]\n";
|
||||
|
||||
String refreshUserToGroupsMappings = "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
|
||||
@ -92,8 +99,13 @@ private static void printHelp(String cmd) {
|
||||
|
||||
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
|
||||
|
||||
String refreshLogRetentionSettings = "-refreshLogRetentionSettings: Refresh 'log retention time' and 'log retention check interval' \n";
|
||||
String refreshJobRetentionSettings = "-refreshJobRetentionSettings:" +
|
||||
"Refresh job history period,job cleaner settings\n";
|
||||
|
||||
String refreshLogRetentionSettings = "-refreshLogRetentionSettings:" +
|
||||
"Refresh log retention period and log retention check interval\n";
|
||||
|
||||
|
||||
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
|
||||
@ -107,6 +119,8 @@ private static void printHelp(String cmd) {
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
} else if ("refreshAdminAcls".equals(cmd)) {
|
||||
System.out.println(refreshAdminAcls);
|
||||
} else if ("refreshJobRetentionSettings".equals(cmd)) {
|
||||
System.out.println(refreshJobRetentionSettings);
|
||||
} else if ("refreshLogRetentionSettings".equals(cmd)) {
|
||||
System.out.println(refreshLogRetentionSettings);
|
||||
} else if ("getGroups".equals(cmd)) {
|
||||
@ -116,6 +130,7 @@ private static void printHelp(String cmd) {
|
||||
System.out.println(refreshUserToGroupsMappings);
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
System.out.println(refreshAdminAcls);
|
||||
System.out.println(refreshJobRetentionSettings);
|
||||
System.out.println(refreshLogRetentionSettings);
|
||||
System.out.println(getGroups);
|
||||
System.out.println(help);
|
||||
@ -201,11 +216,27 @@ private int refreshAdminAcls() throws IOException {
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
// Refresh the user-to-groups mappings
|
||||
|
||||
refreshProtocol.refreshAdminAcls();
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshJobRetentionSettings() throws IOException {
|
||||
// Refresh job retention settings
|
||||
Configuration conf = getConf();
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
|
||||
refreshProtocol.refreshJobRetentionSettings();
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshLogRetentionSettings() throws IOException {
|
||||
// Refresh log retention settings
|
||||
Configuration conf = getConf();
|
||||
@ -214,14 +245,14 @@ private int refreshLogRetentionSettings() throws IOException {
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies
|
||||
.createProxy(conf, address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
|
||||
refreshProtocol.refreshLogRetentionSettings();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
@ -236,6 +267,7 @@ public int run(String[] args) throws Exception {
|
||||
if ("-refreshUserToGroupsMappings".equals(cmd)
|
||||
|| "-refreshSuperUserGroupsConfiguration".equals(cmd)
|
||||
|| "-refreshAdminAcls".equals(cmd)
|
||||
|| "-refreshJobRetentionSettings".equals(cmd)
|
||||
|| "-refreshLogRetentionSettings".equals(cmd)) {
|
||||
if (args.length != 1) {
|
||||
printUsage(cmd);
|
||||
@ -250,6 +282,8 @@ public int run(String[] args) throws Exception {
|
||||
exitCode = refreshSuperUserGroupsConfiguration();
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
exitCode = refreshAdminAcls();
|
||||
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
|
||||
exitCode = refreshJobRetentionSettings();
|
||||
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
|
||||
exitCode = refreshLogRetentionSettings();
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
|
@ -39,6 +39,13 @@ public interface HSAdminRefreshProtocol {
|
||||
* @throws IOException
|
||||
*/
|
||||
public void refreshAdminAcls() throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh job retention settings.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void refreshJobRetentionSettings() throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh log retention settings.
|
||||
|
@ -27,6 +27,7 @@
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
|
||||
@ -42,12 +43,18 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
||||
|
||||
private final HSAdminRefreshProtocolPB rpcProxy;
|
||||
|
||||
private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
|
||||
private final static RefreshAdminAclsRequestProto
|
||||
VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
private final static RefreshLogRetentionSettingsRequestProto VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = RefreshLogRetentionSettingsRequestProto
|
||||
.newBuilder().build();
|
||||
private final static RefreshJobRetentionSettingsRequestProto
|
||||
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST =
|
||||
RefreshJobRetentionSettingsRequestProto.newBuilder().build();
|
||||
|
||||
private final static RefreshLogRetentionSettingsRequestProto
|
||||
VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST =
|
||||
RefreshLogRetentionSettingsRequestProto.newBuilder().build();
|
||||
|
||||
public HSAdminRefreshProtocolClientSideTranslatorPB(
|
||||
HSAdminRefreshProtocolPB rpcProxy) {
|
||||
this.rpcProxy = rpcProxy;
|
||||
@ -68,6 +75,16 @@ public void refreshAdminAcls() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshJobRetentionSettings() throws IOException {
|
||||
try {
|
||||
rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER,
|
||||
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufHelper.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshLogRetentionSettings() throws IOException {
|
||||
try {
|
||||
@ -77,7 +94,7 @@ public void refreshLogRetentionSettings() throws IOException {
|
||||
throw ProtobufHelper.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isMethodSupported(String methodName) throws IOException {
|
||||
return RpcClientUtil.isMethodSupported(rpcProxy,
|
||||
|
@ -23,6 +23,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
@ -36,10 +38,17 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
|
||||
|
||||
private final HSAdminRefreshProtocol impl;
|
||||
|
||||
private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
|
||||
.newBuilder().build();
|
||||
private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto
|
||||
private final static RefreshAdminAclsResponseProto
|
||||
VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
|
||||
.newBuilder().build();
|
||||
|
||||
private final static RefreshJobRetentionSettingsResponseProto
|
||||
VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE =
|
||||
RefreshJobRetentionSettingsResponseProto.newBuilder().build();
|
||||
|
||||
private final static RefreshLogRetentionSettingsResponseProto
|
||||
VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE =
|
||||
RefreshLogRetentionSettingsResponseProto.newBuilder().build();
|
||||
|
||||
public HSAdminRefreshProtocolServerSideTranslatorPB(
|
||||
HSAdminRefreshProtocol impl) {
|
||||
@ -58,9 +67,23 @@ public RefreshAdminAclsResponseProto refreshAdminAcls(
|
||||
return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshJobRetentionSettingsResponseProto refreshJobRetentionSettings(
|
||||
RpcController controller,
|
||||
RefreshJobRetentionSettingsRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.refreshJobRetentionSettings();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshLogRetentionSettingsResponseProto refreshLogRetentionSettings(
|
||||
RpcController controller, RefreshLogRetentionSettingsRequestProto request)
|
||||
RpcController controller,
|
||||
RefreshLogRetentionSettingsRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.refreshLogRetentionSettings();
|
||||
|
@ -44,6 +44,7 @@
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolPB;
|
||||
@ -62,10 +63,13 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
||||
protected RPC.Server clientRpcServer;
|
||||
protected InetSocketAddress clientRpcAddress;
|
||||
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
|
||||
|
||||
public HSAdminServer(AggregatedLogDeletionService aggLogDelService) {
|
||||
private JobHistory jobHistoryService = null;
|
||||
|
||||
public HSAdminServer(AggregatedLogDeletionService aggLogDelService,
|
||||
JobHistory jobHistoryService) {
|
||||
super(HSAdminServer.class.getName());
|
||||
this.aggLogDelService = aggLogDelService;
|
||||
this.jobHistoryService = jobHistoryService;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -100,7 +104,8 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
.setPort(clientRpcAddress.getPort()).setVerbose(false).build();
|
||||
|
||||
addProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService);
|
||||
addProtocol(conf, HSAdminRefreshProtocolPB.class, refreshHSAdminProtocolService);
|
||||
addProtocol(conf, HSAdminRefreshProtocolPB.class,
|
||||
refreshHSAdminProtocolService);
|
||||
|
||||
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
|
||||
@ -196,7 +201,7 @@ public void refreshAdminAcls() throws IOException {
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
|
||||
HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void refreshLogRetentionSettings() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
|
||||
@ -206,4 +211,14 @@ public void refreshLogRetentionSettings() throws IOException {
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"refreshLogRetentionSettings", "HSAdminServer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshJobRetentionSettings() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshJobRetentionSettings");
|
||||
|
||||
jobHistoryService.refreshJobRetentionSettings();
|
||||
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"refreshJobRetentionSettings", HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,18 @@ message RefreshAdminAclsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* refresh job retention settings request.
|
||||
*/
|
||||
message RefreshJobRetentionSettingsRequestProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for refresh job retention.
|
||||
*/
|
||||
message RefreshJobRetentionSettingsResponseProto {
|
||||
}
|
||||
|
||||
/*
|
||||
* refresh log retention request.
|
||||
*/
|
||||
message RefreshLogRetentionSettingsRequestProto {
|
||||
@ -54,6 +66,13 @@ service HSAdminRefreshProtocolService {
|
||||
*/
|
||||
rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
|
||||
returns(RefreshAdminAclsResponseProto);
|
||||
|
||||
/**
|
||||
* Refresh job retention.
|
||||
*/
|
||||
rpc refreshJobRetentionSettings(RefreshJobRetentionSettingsRequestProto)
|
||||
returns(RefreshJobRetentionSettingsResponseProto);
|
||||
|
||||
/**
|
||||
* Refresh log retention
|
||||
*/
|
||||
|
@ -0,0 +1,156 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestJobHistory {
|
||||
|
||||
JobHistory jobHistory = null;
|
||||
|
||||
@Test
|
||||
public void testRefreshJobRetentionSettings() throws IOException,
|
||||
InterruptedException {
|
||||
String root = "mockfs://foo/";
|
||||
String historyDoneDir = root + "mapred/history/done";
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long someTimeYesterday = now - (25l * 3600 * 1000);
|
||||
long timeBefore200Secs = now - (200l * 1000);
|
||||
|
||||
// Get yesterday's date in YY/MM/DD format
|
||||
String timestampComponent = JobHistoryUtils
|
||||
.timestampDirectoryComponent(someTimeYesterday);
|
||||
|
||||
// Create a folder under yesterday's done dir
|
||||
Path donePathYesterday = new Path(historyDoneDir, timestampComponent + "/"
|
||||
+ "000000");
|
||||
FileStatus dirCreatedYesterdayStatus = new FileStatus(0, true, 0, 0,
|
||||
someTimeYesterday, donePathYesterday);
|
||||
|
||||
// Get today's date in YY/MM/DD format
|
||||
timestampComponent = JobHistoryUtils
|
||||
.timestampDirectoryComponent(timeBefore200Secs);
|
||||
|
||||
// Create a folder under today's done dir
|
||||
Path donePathToday = new Path(historyDoneDir, timestampComponent + "/"
|
||||
+ "000000");
|
||||
FileStatus dirCreatedTodayStatus = new FileStatus(0, true, 0, 0,
|
||||
timeBefore200Secs, donePathToday);
|
||||
|
||||
// Create a jhist file with yesterday's timestamp under yesterday's done dir
|
||||
Path fileUnderYesterdayDir = new Path(donePathYesterday.toString(),
|
||||
"job_1372363578825_0015-" + someTimeYesterday + "-user-Sleep+job-"
|
||||
+ someTimeYesterday + "-1-1-SUCCEEDED-default.jhist");
|
||||
FileStatus fileUnderYesterdayDirStatus = new FileStatus(10, false, 0, 0,
|
||||
someTimeYesterday, fileUnderYesterdayDir);
|
||||
|
||||
// Create a jhist file with today's timestamp under today's done dir
|
||||
Path fileUnderTodayDir = new Path(donePathYesterday.toString(),
|
||||
"job_1372363578825_0016-" + timeBefore200Secs + "-user-Sleep+job-"
|
||||
+ timeBefore200Secs + "-1-1-SUCCEEDED-default.jhist");
|
||||
FileStatus fileUnderTodayDirStatus = new FileStatus(10, false, 0, 0,
|
||||
timeBefore200Secs, fileUnderTodayDir);
|
||||
|
||||
HistoryFileManager historyManager = spy(new HistoryFileManager());
|
||||
jobHistory = spy(new JobHistory());
|
||||
|
||||
List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
|
||||
fileStatusList.add(dirCreatedYesterdayStatus);
|
||||
fileStatusList.add(dirCreatedTodayStatus);
|
||||
|
||||
// Make the initial delay of history job cleaner as 4 secs
|
||||
doReturn(4).when(jobHistory).getInitDelaySecs();
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
|
||||
List<FileStatus> list1 = new LinkedList<FileStatus>();
|
||||
list1.add(fileUnderYesterdayDirStatus);
|
||||
doReturn(list1).when(historyManager).scanDirectoryForHistoryFiles(
|
||||
eq(donePathYesterday), any(FileContext.class));
|
||||
|
||||
List<FileStatus> list2 = new LinkedList<FileStatus>();
|
||||
list2.add(fileUnderTodayDirStatus);
|
||||
doReturn(list2).when(historyManager).scanDirectoryForHistoryFiles(
|
||||
eq(donePathToday), any(FileContext.class));
|
||||
|
||||
doReturn(fileStatusList).when(historyManager).findTimestampedDirectories();
|
||||
doReturn(true).when(historyManager).deleteDir(any(FileStatus.class));
|
||||
|
||||
JobListCache jobListCache = mock(JobListCache.class);
|
||||
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||
doReturn(jobListCache).when(historyManager).createJobListCache();
|
||||
when(jobListCache.get(any(JobId.class))).thenReturn(fileInfo);
|
||||
|
||||
doNothing().when(fileInfo).delete();
|
||||
|
||||
// Set job retention time to 24 hrs and cleaner interval to 2 secs
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 24l * 3600 * 1000);
|
||||
conf.setLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, 2 * 1000);
|
||||
|
||||
jobHistory.init(conf);
|
||||
|
||||
jobHistory.start();
|
||||
|
||||
assertEquals(2 * 1000l, jobHistory.getCleanerInterval());
|
||||
|
||||
// Only yesterday's jhist file should get deleted
|
||||
verify(fileInfo, timeout(20000).times(1)).delete();
|
||||
|
||||
fileStatusList.remove(dirCreatedYesterdayStatus);
|
||||
// Now reset job retention time to 10 secs
|
||||
conf.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 10 * 1000);
|
||||
// Set cleaner interval to 1 sec
|
||||
conf.setLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, 1 * 1000);
|
||||
|
||||
doReturn(conf).when(jobHistory).createConf();
|
||||
// Do refresh job retention settings
|
||||
jobHistory.refreshJobRetentionSettings();
|
||||
|
||||
// Cleaner interval should be updated
|
||||
assertEquals(1 * 1000l, jobHistory.getCleanerInterval());
|
||||
// Today's jhist file will also be deleted now since it falls below the
|
||||
// retention threshold
|
||||
verify(fileInfo, timeout(20000).times(2)).delete();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
if (jobHistory != null) {
|
||||
jobHistory.stop();
|
||||
}
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||
@ -49,6 +50,7 @@ public class TestHSAdminServer {
|
||||
private HSAdmin hsAdminClient = null;
|
||||
Configuration conf = null;
|
||||
private static long groupRefreshTimeoutSec = 1;
|
||||
JobHistory jobHistoryService = null;
|
||||
AggregatedLogDeletionService alds = null;
|
||||
|
||||
public static class MockUnixGroupsMapping implements
|
||||
@ -85,9 +87,11 @@ public void init() throws HadoopIllegalArgumentException, IOException {
|
||||
GroupMappingServiceProvider.class);
|
||||
conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec);
|
||||
Groups.getUserToGroupsMappingService(conf);
|
||||
jobHistoryService = mock(JobHistory.class);
|
||||
alds = mock(AggregatedLogDeletionService.class);
|
||||
|
||||
hsAdminServer = new HSAdminServer(alds) {
|
||||
hsAdminServer = new HSAdminServer(alds, jobHistoryService) {
|
||||
|
||||
@Override
|
||||
protected Configuration createConf() {
|
||||
return conf;
|
||||
@ -236,13 +240,21 @@ public void testRefreshAdminAcls() throws Exception {
|
||||
}
|
||||
assertTrue(th instanceof RemoteException);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRefreshLogRetentionSettings() throws Exception {
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshLogRetentionSettings";
|
||||
hsAdminClient.run(args);
|
||||
verify(alds).refreshLogRetentionSettings();
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshLogRetentionSettings";
|
||||
hsAdminClient.run(args);
|
||||
verify(alds).refreshLogRetentionSettings();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshJobRetentionSettings() throws Exception {
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshJobRetentionSettings";
|
||||
hsAdminClient.run(args);
|
||||
verify(jobHistoryService).refreshJobRetentionSettings();
|
||||
}
|
||||
|
||||
@After
|
||||
|
Loading…
Reference in New Issue
Block a user