MAPREDUCE-6622. Add capability to set JHS job cache to a task-based limit (rchiang via rkanter)
This commit is contained in:
parent
d1d4e16690
commit
0f72da7e28
@ -322,6 +322,9 @@ Release 2.9.0 - UNRELEASED
|
||||
MAPREDUCE-6640. mapred job -history command should be able to take
|
||||
Job ID (rkanter)
|
||||
|
||||
MAPREDUCE-6622. Add capability to set JHS job cache to a
|
||||
task-based limit (rchiang via rkanter)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -107,6 +107,10 @@ public class JHAdminConfig {
|
||||
MR_HISTORY_PREFIX + "loadedjobs.cache.size";
|
||||
public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
|
||||
|
||||
/** Size of the loaded job cache (in tasks).*/
|
||||
public static final String MR_HISTORY_LOADED_TASKS_CACHE_SIZE =
|
||||
MR_HISTORY_PREFIX + "loadedtasks.cache.size";
|
||||
|
||||
/**
|
||||
* The maximum age of a job history file before it is deleted from the history
|
||||
* server.
|
||||
|
@ -1656,7 +1656,32 @@
|
||||
<property>
|
||||
<name>mapreduce.jobhistory.loadedjobs.cache.size</name>
|
||||
<value>5</value>
|
||||
<description>Size of the loaded job cache</description>
|
||||
<description>Size of the loaded job cache. This property is ignored if
|
||||
the property mapreduce.jobhistory.loadedtasks.cache.size is set to a
|
||||
positive value.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.jobhistory.loadedtasks.cache.size</name>
|
||||
<value></value>
|
||||
<description>Change the job history cache limit to be set in terms
|
||||
of total task count. If the total number of tasks loaded exceeds
|
||||
this value, then the job cache will be shrunk down until it is
|
||||
under this limit (minimum 1 job in cache). If this value is empty
|
||||
or nonpositive then the cache reverts to using the property
|
||||
mapreduce.jobhistory.loadedjobs.cache.size as a job cache size.
|
||||
|
||||
Two recommendations for the mapreduce.jobhistory.loadedtasks.cache.size
|
||||
property:
|
||||
1) For every 100k of cache size, set the heap size of the Job History
|
||||
Server to 1.2GB. For example,
|
||||
mapreduce.jobhistory.loadedtasks.cache.size=500000, heap size=6GB.
|
||||
2) Make sure that the cache size is larger than the number of tasks
|
||||
required for the largest job run on the cluster. It might be a good
|
||||
idea to set the value slightly higher (say, 20%) in order to allow
|
||||
for job size growth.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
|
@ -20,12 +20,16 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.Weigher;
|
||||
import com.google.common.util.concurrent.UncheckedExecutionException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -49,9 +53,10 @@ public class CachedHistoryStorage extends AbstractService implements
|
||||
HistoryStorage {
|
||||
private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
|
||||
|
||||
private Map<JobId, Job> loadedJobCache = null;
|
||||
// The number of loaded jobs.
|
||||
private LoadingCache<JobId, Job> loadedJobCache = null;
|
||||
private int loadedJobCacheSize;
|
||||
private int loadedTasksCacheSize;
|
||||
private boolean useLoadedTasksCache;
|
||||
|
||||
private HistoryFileManager hsManager;
|
||||
|
||||
@ -70,17 +75,70 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private void createLoadedJobCache(Configuration conf) {
|
||||
// Set property for old "loaded jobs" cache
|
||||
loadedJobCacheSize = conf.getInt(
|
||||
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
|
||||
|
||||
loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
|
||||
loadedJobCacheSize + 1, 0.75f, true) {
|
||||
@Override
|
||||
public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
|
||||
return super.size() > loadedJobCacheSize;
|
||||
// Check property for new "loaded tasks" cache perform sanity checking
|
||||
useLoadedTasksCache = false;
|
||||
try {
|
||||
String taskSizeString = conf
|
||||
.get(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE);
|
||||
if (taskSizeString != null) {
|
||||
loadedTasksCacheSize = Math.max(Integer.parseInt(taskSizeString), 1);
|
||||
useLoadedTasksCache = true;
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
LOG.error("The property " +
|
||||
JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE +
|
||||
" is not an integer value. Please set it to a positive" +
|
||||
" integer value.");
|
||||
}
|
||||
|
||||
CacheLoader<JobId, Job> loader;
|
||||
loader = new CacheLoader<JobId, Job>() {
|
||||
@Override
|
||||
public Job load(JobId key) throws Exception {
|
||||
return loadJob(key);
|
||||
}
|
||||
};
|
||||
|
||||
if (!useLoadedTasksCache) {
|
||||
loadedJobCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(loadedJobCacheSize)
|
||||
.initialCapacity(loadedJobCacheSize)
|
||||
.concurrencyLevel(1)
|
||||
.build(loader);
|
||||
} else {
|
||||
Weigher<JobId, Job> weightByTasks;
|
||||
weightByTasks = new Weigher<JobId, Job>() {
|
||||
/**
|
||||
* Method for calculating Job weight by total task count. If
|
||||
* the total task count is greater than the size of the tasks
|
||||
* cache, then cap it at the cache size. This allows the cache
|
||||
* to always hold one large job.
|
||||
* @param key JobId object
|
||||
* @param value Job object
|
||||
* @return Weight of the job as calculated by total task count
|
||||
*/
|
||||
@Override
|
||||
public int weigh(JobId key, Job value) {
|
||||
int taskCount = Math.min(loadedTasksCacheSize,
|
||||
value.getTotalMaps() + value.getTotalReduces());
|
||||
return taskCount;
|
||||
}
|
||||
};
|
||||
// Keep concurrencyLevel at 1. Otherwise, two problems:
|
||||
// 1) The largest job that can be initially loaded is
|
||||
// cache size / 4.
|
||||
// 2) Unit tests are not deterministic.
|
||||
loadedJobCache = CacheBuilder.newBuilder()
|
||||
.maximumWeight(loadedTasksCacheSize)
|
||||
.weigher(weightByTasks)
|
||||
.concurrencyLevel(1)
|
||||
.build(loader);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void refreshLoadedJobCache() {
|
||||
@ -101,51 +159,47 @@ public CachedHistoryStorage() {
|
||||
super(CachedHistoryStorage.class.getName());
|
||||
}
|
||||
|
||||
private Job loadJob(HistoryFileInfo fileInfo) {
|
||||
try {
|
||||
Job job = fileInfo.loadJob();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding " + job.getID() + " to loaded job cache");
|
||||
private static class HSFileRuntimeException extends RuntimeException {
|
||||
public HSFileRuntimeException(String message) {
|
||||
super(message);
|
||||
}
|
||||
// We can clobber results here, but that should be OK, because it only
|
||||
// means that we may have two identical copies of the same job floating
|
||||
// around for a while.
|
||||
loadedJobCache.put(job.getID(), job);
|
||||
return job;
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not find/load job: " + fileInfo.getJobId(), e);
|
||||
}
|
||||
|
||||
private Job loadJob(JobId jobId) throws RuntimeException, IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Looking for Job " + jobId);
|
||||
}
|
||||
HistoryFileInfo fileInfo;
|
||||
|
||||
fileInfo = hsManager.getFileInfo(jobId);
|
||||
if (fileInfo == null) {
|
||||
throw new HSFileRuntimeException("Unable to find job " + jobId);
|
||||
} else if (fileInfo.isDeleted()) {
|
||||
throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
|
||||
} else {
|
||||
return fileInfo.loadJob();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<JobId, Job> getLoadedJobCache() {
|
||||
Cache<JobId, Job> getLoadedJobCache() {
|
||||
return loadedJobCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Job getFullJob(JobId jobId) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Looking for Job " + jobId);
|
||||
}
|
||||
Job retVal = null;
|
||||
try {
|
||||
HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId);
|
||||
Job result = null;
|
||||
if (fileInfo != null) {
|
||||
result = loadedJobCache.get(jobId);
|
||||
if (result == null) {
|
||||
result = loadJob(fileInfo);
|
||||
} else if(fileInfo.isDeleted()) {
|
||||
loadedJobCache.remove(jobId);
|
||||
result = null;
|
||||
}
|
||||
retVal = loadedJobCache.getUnchecked(jobId);
|
||||
} catch (UncheckedExecutionException e) {
|
||||
if (e.getCause() instanceof HSFileRuntimeException) {
|
||||
LOG.error(e.getCause().getMessage());
|
||||
return null;
|
||||
} else {
|
||||
loadedJobCache.remove(jobId);
|
||||
throw new YarnRuntimeException(e.getCause());
|
||||
}
|
||||
return result;
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -243,4 +297,14 @@ public static JobsInfo getPartialJobs(Collection<Job> jobs, Long offset,
|
||||
}
|
||||
return allJobs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean getUseLoadedTasksCache() {
|
||||
return useLoadedTasksCache;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getLoadedTasksCacheSize() {
|
||||
return loadedTasksCacheSize;
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -35,16 +36,27 @@
|
||||
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
|
||||
public class TestJobHistory {
|
||||
|
||||
@ -58,13 +70,15 @@ public void testRefreshLoadedJobCache() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// Set the cache size to 2
|
||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 2);
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertFalse(storage.getUseLoadedTasksCache());
|
||||
|
||||
Job[] jobs = new Job[3];
|
||||
JobId[] jobIds = new JobId[3];
|
||||
|
||||
@ -84,14 +98,13 @@ public void testRefreshLoadedJobCache() throws Exception {
|
||||
storage.getFullJob(jobs[i].getID());
|
||||
}
|
||||
|
||||
Map<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||
// job0 should have been purged since cache size is 2
|
||||
assertFalse(jobCache.containsKey(jobs[0].getID()));
|
||||
assertTrue(jobCache.containsKey(jobs[1].getID())
|
||||
&& jobCache.containsKey(jobs[2].getID()));
|
||||
Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||
// Verify some jobs are stored in the cache. Hard to predict eviction
|
||||
// in Guava version.
|
||||
assertTrue(jobCache.size() > 0);
|
||||
|
||||
// Setting cache size to 3
|
||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 3);
|
||||
doReturn(conf).when(storage).createConf();
|
||||
|
||||
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
||||
@ -105,9 +118,223 @@ public void testRefreshLoadedJobCache() throws Exception {
|
||||
|
||||
jobCache = storage.getLoadedJobCache();
|
||||
|
||||
// All three jobs should be in cache since its size is now 3
|
||||
for (int i = 0; i < 3; i++) {
|
||||
assertTrue(jobCache.containsKey(jobs[i].getID()));
|
||||
// Verify some jobs are stored in the cache. Hard to predict eviction
|
||||
// in Guava version.
|
||||
assertTrue(jobCache.size() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTasksCacheLimit() throws Exception {
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// Set the cache threshold to 50 tasks
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertTrue(storage.getUseLoadedTasksCache());
|
||||
assertEquals(storage.getLoadedTasksCacheSize(), 50);
|
||||
|
||||
// Create a bunch of smaller jobs (<< 50 tasks)
|
||||
Job[] jobs = new Job[10];
|
||||
JobId[] jobIds = new JobId[10];
|
||||
for (int i = 0; i < jobs.length; i++) {
|
||||
jobs[i] = mock(Job.class);
|
||||
jobIds[i] = mock(JobId.class);
|
||||
when(jobs[i].getID()).thenReturn(jobIds[i]);
|
||||
when(jobs[i].getTotalMaps()).thenReturn(10);
|
||||
when(jobs[i].getTotalReduces()).thenReturn(2);
|
||||
}
|
||||
|
||||
// Create some large jobs that forces task-based cache flushing
|
||||
Job[] lgJobs = new Job[3];
|
||||
JobId[] lgJobIds = new JobId[3];
|
||||
for (int i = 0; i < lgJobs.length; i++) {
|
||||
lgJobs[i] = mock(Job.class);
|
||||
lgJobIds[i] = mock(JobId.class);
|
||||
when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
|
||||
when(lgJobs[i].getTotalMaps()).thenReturn(2000);
|
||||
when(lgJobs[i].getTotalReduces()).thenReturn(10);
|
||||
}
|
||||
|
||||
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||
when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
||||
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
||||
.thenReturn(jobs[2]).thenReturn(jobs[3]).thenReturn(jobs[4])
|
||||
.thenReturn(jobs[5]).thenReturn(jobs[6]).thenReturn(jobs[7])
|
||||
.thenReturn(jobs[8]).thenReturn(jobs[9]).thenReturn(lgJobs[0])
|
||||
.thenReturn(lgJobs[1]).thenReturn(lgJobs[2]);
|
||||
|
||||
// getFullJob will put the job in the cache if it isn't there
|
||||
Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||
for (int i = 0; i < jobs.length; i++) {
|
||||
storage.getFullJob(jobs[i].getID());
|
||||
}
|
||||
long prevSize = jobCache.size();
|
||||
|
||||
// Fill the cache with some larger jobs and verify the cache
|
||||
// gets reduced in size.
|
||||
for (int i = 0; i < lgJobs.length; i++) {
|
||||
storage.getFullJob(lgJobs[i].getID());
|
||||
}
|
||||
assertTrue(jobCache.size() < prevSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobCacheLimitLargerThanMax() throws Exception {
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
JobHistory jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// Set the cache threshold to 50 tasks
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 500);
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertTrue(storage.getUseLoadedTasksCache());
|
||||
assertEquals(storage.getLoadedTasksCacheSize(), 500);
|
||||
|
||||
// Create a bunch of large jobs (>> 50 tasks)
|
||||
Job[] lgJobs = new Job[10];
|
||||
JobId[] lgJobIds = new JobId[10];
|
||||
for (int i = 0; i < lgJobs.length; i++) {
|
||||
lgJobs[i] = mock(Job.class);
|
||||
lgJobIds[i] = mock(JobId.class);
|
||||
when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
|
||||
when(lgJobs[i].getTotalMaps()).thenReturn(700);
|
||||
when(lgJobs[i].getTotalReduces()).thenReturn(50);
|
||||
}
|
||||
|
||||
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||
when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
||||
when(fileInfo.loadJob()).thenReturn(lgJobs[0]).thenReturn(lgJobs[1])
|
||||
.thenReturn(lgJobs[2]).thenReturn(lgJobs[3]).thenReturn(lgJobs[4])
|
||||
.thenReturn(lgJobs[5]).thenReturn(lgJobs[6]).thenReturn(lgJobs[7])
|
||||
.thenReturn(lgJobs[8]).thenReturn(lgJobs[9]);
|
||||
|
||||
// getFullJob will put the job in the cache if it isn't there
|
||||
Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||
long[] cacheSize = new long[10];
|
||||
for (int i = 0; i < lgJobs.length; i++) {
|
||||
storage.getFullJob(lgJobs[i].getID());
|
||||
assertTrue(jobCache.size() > 0);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadedTasksEmptyConfiguration() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, "");
|
||||
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
JobHistory jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertFalse(storage.getUseLoadedTasksCache());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadedTasksZeroConfiguration() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 0);
|
||||
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
JobHistory jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertTrue(storage.getUseLoadedTasksCache());
|
||||
assertEquals(storage.getLoadedTasksCacheSize(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadedTasksNegativeConfiguration() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, -1);
|
||||
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
JobHistory jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertTrue(storage.getUseLoadedTasksCache());
|
||||
assertEquals(storage.getLoadedTasksCacheSize(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadJobErrorCases() throws IOException {
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// Set the cache threshold to 50 tasks
|
||||
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
assertTrue(storage.getUseLoadedTasksCache());
|
||||
assertEquals(storage.getLoadedTasksCacheSize(), 50);
|
||||
|
||||
// Create jobs for bad fileInfo results
|
||||
Job[] jobs = new Job[4];
|
||||
JobId[] jobIds = new JobId[4];
|
||||
for (int i = 0; i < jobs.length; i++) {
|
||||
jobs[i] = mock(Job.class);
|
||||
jobIds[i] = mock(JobId.class);
|
||||
when(jobs[i].getID()).thenReturn(jobIds[i]);
|
||||
when(jobs[i].getTotalMaps()).thenReturn(10);
|
||||
when(jobs[i].getTotalReduces()).thenReturn(2);
|
||||
}
|
||||
|
||||
HistoryFileInfo loadJobException = mock(HistoryFileInfo.class);
|
||||
when(loadJobException.loadJob()).thenThrow(new IOException("History file not found"));
|
||||
when(historyManager.getFileInfo(jobIds[0])).thenThrow(new IOException(""));
|
||||
when(historyManager.getFileInfo(jobIds[1])).thenReturn(null);
|
||||
when(historyManager.getFileInfo(jobIds[2])).thenReturn(loadJobException);
|
||||
|
||||
try {
|
||||
storage.getFullJob(jobIds[0]);
|
||||
fail("Did not get expected YarnRuntimeException for getFileInfo() throwing IOException");
|
||||
} catch (YarnRuntimeException e) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
// fileInfo==null should return null
|
||||
Job job = storage.getFullJob(jobIds[1]);
|
||||
assertNull(job);
|
||||
|
||||
try {
|
||||
storage.getFullJob(jobIds[2]);
|
||||
fail("Did not get expected YarnRuntimeException for fileInfo.loadJob() throwing IOException");
|
||||
} catch (YarnRuntimeException e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user