MAPREDUCE-5411. Refresh size of loaded job cache on history server. Contributed by Ashwin Shankar

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1508220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-07-29 22:33:45 +00:00
parent c23cf3cddf
commit 8bb035509e
11 changed files with 289 additions and 9 deletions

View File

@ -152,6 +152,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5386. Ability to refresh history server job retention and job
cleaner settings (Ashwin Shankar via jlowe)
MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin
Shankar via jlowe)
IMPROVEMENTS
OPTIMIZATIONS

View File

@ -40,6 +40,8 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
/**
* Manages an in memory cache of parsed Job History files.
*/
@ -58,12 +60,16 @@ public void setHistoryFileManager(HistoryFileManager hsManager) {
this.hsManager = hsManager;
}
@SuppressWarnings("serial")
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
LOG.info("CachedHistoryStorage Init");
createLoadedJobCache(conf);
}
@SuppressWarnings("serial")
private void createLoadedJobCache(Configuration conf) {
loadedJobCacheSize = conf.getInt(
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
@ -76,11 +82,25 @@ public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
}
});
}
public void refreshLoadedJobCache() {
if (getServiceState() == STATE.STARTED) {
setConfig(createConf());
createLoadedJobCache(getConfig());
} else {
LOG.warn("Failed to execute refreshLoadedJobCache: CachedHistoryStorage is not started");
}
}
@VisibleForTesting
Configuration createConf() {
return new Configuration();
}
public CachedHistoryStorage() {
super(CachedHistoryStorage.class.getName());
}
private Job loadJob(HistoryFileInfo fileInfo) {
try {
Job job = fileInfo.loadJob();
@ -98,6 +118,11 @@ private Job loadJob(HistoryFileInfo fileInfo) {
}
}
@VisibleForTesting
Map<JobId, Job> getLoadedJobCache() {
return loadedJobCache;
}
@Override
public Job getFullJob(JobId jobId) {
if (LOG.isDebugEnabled()) {

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@ -97,9 +98,8 @@ protected void serviceInit(Configuration conf) throws Exception {
throw new YarnRuntimeException("Failed to intialize existing directories", e);
}
storage = ReflectionUtils.newInstance(conf.getClass(
JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
HistoryStorage.class), conf);
storage = createHistoryStorage();
if (storage instanceof Service) {
((Service) storage).init(conf);
}
@ -108,6 +108,12 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
protected HistoryStorage createHistoryStorage() {
return ReflectionUtils.newInstance(conf.getClass(
JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
HistoryStorage.class), conf);
}
protected HistoryFileManager createHistoryFileManager() {
return new HistoryFileManager();
}
@ -229,6 +235,25 @@ public Map<JobId, Job> getAllJobs() {
return storage.getAllPartialJobs();
}
public void refreshLoadedJobCache() {
if (getServiceState() == STATE.STARTED) {
if (storage instanceof CachedHistoryStorage) {
((CachedHistoryStorage) storage).refreshLoadedJobCache();
} else {
throw new UnsupportedOperationException(storage.getClass().getName()
+ " is expected to be an instance of "
+ CachedHistoryStorage.class.getName());
}
} else {
LOG.warn("Failed to execute refreshLoadedJobCache: JobHistory service is not started");
}
}
@VisibleForTesting
HistoryStorage getHistoryStorage() {
return storage;
}
/**
* Look for a set of partial jobs.
*

View File

@ -60,6 +60,8 @@ private static void printUsage(String cmd) {
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
} else if ("-refreshAdminAcls".equals(cmd)) {
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
} else if ("-refreshLoadedJobCache".equals(cmd)) {
System.err.println("Usage: mapred hsadmin [-refreshLoadedJobCache]");
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
System.err
.println("Usage: mapred hsadmin [-refreshJobRetentionSettings]");
@ -73,6 +75,7 @@ private static void printUsage(String cmd) {
System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshLoadedJobCache]");
System.err.println(" [-refreshJobRetentionSettings]");
System.err.println(" [-refreshLogRetentionSettings]");
System.err.println(" [-getGroups [username]]");
@ -89,6 +92,7 @@ private static void printHelp(String cmd) {
+ " [-refreshUserToGroupsMappings]"
+ " [-refreshSuperUserGroupsConfiguration]"
+ " [-refreshAdminAcls]"
+ " [-refreshLoadedJobCache]"
+ " [-refreshLogRetentionSettings]"
+ " [-refreshJobRetentionSettings]"
+ " [-getGroups [username]]" + " [-help [cmd]]\n";
@ -99,13 +103,14 @@ private static void printHelp(String cmd) {
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
String refreshLoadedJobCache = "-refreshLoadedJobCache: Refresh loaded job cache of Job history server\n";
String refreshJobRetentionSettings = "-refreshJobRetentionSettings:" +
"Refresh job history period,job cleaner settings\n";
String refreshLogRetentionSettings = "-refreshLogRetentionSettings:" +
"Refresh log retention period and log retention check interval\n";
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
@ -119,6 +124,8 @@ private static void printHelp(String cmd) {
System.out.println(refreshSuperUserGroupsConfiguration);
} else if ("refreshAdminAcls".equals(cmd)) {
System.out.println(refreshAdminAcls);
} else if ("refreshLoadedJobCache".equals(cmd)) {
System.out.println(refreshLoadedJobCache);
} else if ("refreshJobRetentionSettings".equals(cmd)) {
System.out.println(refreshJobRetentionSettings);
} else if ("refreshLogRetentionSettings".equals(cmd)) {
@ -130,6 +137,7 @@ private static void printHelp(String cmd) {
System.out.println(refreshUserToGroupsMappings);
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls);
System.out.println(refreshLoadedJobCache);
System.out.println(refreshJobRetentionSettings);
System.out.println(refreshLogRetentionSettings);
System.out.println(getGroups);
@ -221,6 +229,22 @@ private int refreshAdminAcls() throws IOException {
return 0;
}
private int refreshLoadedJobCache() throws IOException {
// Refresh the loaded job cache
Configuration conf = getConf();
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
address, HSAdminRefreshProtocol.class,
UserGroupInformation.getCurrentUser());
refreshProtocol.refreshLoadedJobCache();
return 0;
}
private int refreshJobRetentionSettings() throws IOException {
// Refresh job retention settings
Configuration conf = getConf();
@ -267,6 +291,7 @@ public int run(String[] args) throws Exception {
if ("-refreshUserToGroupsMappings".equals(cmd)
|| "-refreshSuperUserGroupsConfiguration".equals(cmd)
|| "-refreshAdminAcls".equals(cmd)
|| "-refreshLoadedJobCache".equals(cmd)
|| "-refreshJobRetentionSettings".equals(cmd)
|| "-refreshLogRetentionSettings".equals(cmd)) {
if (args.length != 1) {
@ -282,6 +307,8 @@ public int run(String[] args) throws Exception {
exitCode = refreshSuperUserGroupsConfiguration();
} else if ("-refreshAdminAcls".equals(cmd)) {
exitCode = refreshAdminAcls();
} else if ("-refreshLoadedJobCache".equals(cmd)) {
exitCode = refreshLoadedJobCache();
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
exitCode = refreshJobRetentionSettings();
} else if ("-refreshLogRetentionSettings".equals(cmd)) {

View File

@ -40,6 +40,12 @@ public interface HSAdminRefreshProtocol {
*/
public void refreshAdminAcls() throws IOException;
/**
* Refresh loaded job cache
* @throws IOException
*/
public void refreshLoadedJobCache() throws IOException;
/**
* Refresh job retention settings.
*

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
@ -46,6 +47,10 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
private final static RefreshAdminAclsRequestProto
VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
.newBuilder().build();
private final static RefreshLoadedJobCacheRequestProto
VOID_REFRESH_LOADED_JOB_CACHE_REQUEST = RefreshLoadedJobCacheRequestProto
.newBuilder().build();
private final static RefreshJobRetentionSettingsRequestProto
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST =
@ -75,6 +80,17 @@ public void refreshAdminAcls() throws IOException {
}
}
@Override
public void refreshLoadedJobCache() throws IOException {
try {
rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER,
VOID_REFRESH_LOADED_JOB_CACHE_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override
public void refreshJobRetentionSettings() throws IOException {
try {
@ -101,4 +117,5 @@ public boolean isMethodSupported(String methodName) throws IOException {
HSAdminRefreshProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(HSAdminRefreshProtocolPB.class), methodName);
}
}

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
@ -41,11 +43,15 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
private final static RefreshAdminAclsResponseProto
VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
.newBuilder().build();
private final static RefreshLoadedJobCacheResponseProto
VOID_REFRESH_LOADED_JOB_CACHE_RESPONSE = RefreshLoadedJobCacheResponseProto
.newBuilder().build();
private final static RefreshJobRetentionSettingsResponseProto
VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE =
RefreshJobRetentionSettingsResponseProto.newBuilder().build();
private final static RefreshLogRetentionSettingsResponseProto
VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE =
RefreshLogRetentionSettingsResponseProto.newBuilder().build();
@ -67,6 +73,18 @@ public RefreshAdminAclsResponseProto refreshAdminAcls(
return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
}
@Override
public RefreshLoadedJobCacheResponseProto refreshLoadedJobCache(
RpcController controller, RefreshLoadedJobCacheRequestProto request)
throws ServiceException {
try {
impl.refreshLoadedJobCache();
} catch (IOException e) {
throw new ServiceException(e);
}
return VOID_REFRESH_LOADED_JOB_CACHE_RESPONSE;
}
@Override
public RefreshJobRetentionSettingsResponseProto refreshJobRetentionSettings(
RpcController controller,

View File

@ -202,6 +202,22 @@ public void refreshAdminAcls() throws IOException {
HISTORY_ADMIN_SERVER);
}
@Override
public void refreshLoadedJobCache() throws IOException {
UserGroupInformation user = checkAcls("refreshLoadedJobCache");
try {
jobHistoryService.refreshLoadedJobCache();
} catch (UnsupportedOperationException e) {
HSAuditLogger.logFailure(user.getShortUserName(),
"refreshLoadedJobCache", adminAcl.toString(), HISTORY_ADMIN_SERVER,
e.getMessage());
throw e;
}
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshLoadedJobCache",
HISTORY_ADMIN_SERVER);
}
@Override
public void refreshLogRetentionSettings() throws IOException {
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");

View File

@ -33,6 +33,18 @@ message RefreshAdminAclsRequestProto {
message RefreshAdminAclsResponseProto {
}
/**
* refresh loaded job cache request.
*/
message RefreshLoadedJobCacheRequestProto {
}
/**
* Response for refresh loaded job cache.
*/
message RefreshLoadedJobCacheResponseProto {
}
/**
* refresh job retention settings request.
*/
@ -66,6 +78,12 @@ service HSAdminRefreshProtocolService {
*/
rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
returns(RefreshAdminAclsResponseProto);
/**
* Refresh loaded job cache
*/
rpc refreshLoadedJobCache(RefreshLoadedJobCacheRequestProto)
returns(RefreshLoadedJobCacheResponseProto);
/**
* Refresh job retention.

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.util.Map;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@ -27,8 +29,10 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.junit.After;
@ -36,11 +40,76 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestJobHistory {
JobHistory jobHistory = null;
@Test
public void testRefreshLoadedJobCache() throws Exception {
HistoryFileManager historyManager = mock(HistoryFileManager.class);
jobHistory = spy(new JobHistory());
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
Configuration conf = new Configuration();
// Set the cache size to 2
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
jobHistory.init(conf);
jobHistory.start();
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
.getHistoryStorage());
Job[] jobs = new Job[3];
JobId[] jobIds = new JobId[3];
for (int i = 0; i < 3; i++) {
jobs[i] = mock(Job.class);
jobIds[i] = mock(JobId.class);
when(jobs[i].getID()).thenReturn(jobIds[i]);
}
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
.thenReturn(jobs[2]);
// getFullJob will put the job in the cache if it isn't there
for (int i = 0; i < 3; i++) {
storage.getFullJob(jobs[i].getID());
}
Map<JobId, Job> jobCache = storage.getLoadedJobCache();
// job0 should have been purged since cache size is 2
assertFalse(jobCache.containsKey(jobs[0].getID()));
assertTrue(jobCache.containsKey(jobs[1].getID())
&& jobCache.containsKey(jobs[2].getID()));
// Setting cache size to 3
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
doReturn(conf).when(storage).createConf();
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
.thenReturn(jobs[2]);
jobHistory.refreshLoadedJobCache();
for (int i = 0; i < 3; i++) {
storage.getFullJob(jobs[i].getID());
}
jobCache = storage.getLoadedJobCache();
// All three jobs should be in cache since its size is now 3
for (int i = 0; i < 3; i++) {
assertTrue(jobCache.containsKey(jobs[i].getID()));
}
}
@Test
public void testRefreshJobRetentionSettings() throws IOException,
InterruptedException {
@ -147,6 +216,54 @@ public void testRefreshJobRetentionSettings() throws IOException,
verify(fileInfo, timeout(20000).times(2)).delete();
}
@Test
public void testRefreshLoadedJobCacheUnSupportedOperation() {
jobHistory = spy(new JobHistory());
HistoryStorage storage = new HistoryStorage() {
@Override
public void setHistoryFileManager(HistoryFileManager hsManager) {
// TODO Auto-generated method stub
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
// TODO Auto-generated method stub
return null;
}
@Override
public Job getFullJob(JobId jobId) {
// TODO Auto-generated method stub
return null;
}
@Override
public Map<JobId, Job> getAllPartialJobs() {
// TODO Auto-generated method stub
return null;
}
};
doReturn(storage).when(jobHistory).createHistoryStorage();
jobHistory.init(new Configuration());
jobHistory.start();
Throwable th = null;
try {
jobHistory.refreshLoadedJobCache();
} catch (Exception e) {
th = e;
}
assertTrue(th instanceof UnsupportedOperationException);
}
@After
public void cleanUp() {
if (jobHistory != null) {

View File

@ -241,6 +241,14 @@ public void testRefreshAdminAcls() throws Exception {
assertTrue(th instanceof RemoteException);
}
@Test
public void testRefreshLoadedJobCache() throws Exception {
String[] args = new String[1];
args[0] = "-refreshLoadedJobCache";
hsAdminClient.run(args);
verify(jobHistoryService).refreshLoadedJobCache();
}
@Test
public void testRefreshLogRetentionSettings() throws Exception {
String[] args = new String[1];