MAPREDUCE-4705. Fix a bug in job history lookup, which makes older jobs inaccessible despite the presence of a valid history file. (Contributed by Jason Lowe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1395850 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1e30e49bf6
commit
8ac3910ae0
@ -558,6 +558,10 @@ Release 0.23.5 - UNRELEASED
|
||||
MAPREDUCE-4554. Job Credentials are not transmitted if security is turned
|
||||
off (Benoy Antony via bobby)
|
||||
|
||||
MAPREDUCE-4705. Fix a bug in job history lookup, which makes older jobs
|
||||
inaccessible despite the presence of a valid history file. (Jason Lowe
|
||||
via sseth)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -62,6 +62,7 @@
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
@ -130,7 +131,7 @@ public synchronized Set<String> get(String serialPart) {
|
||||
}
|
||||
}
|
||||
|
||||
private static class JobListCache {
|
||||
static class JobListCache {
|
||||
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
|
||||
private int maxSize;
|
||||
private long maxAge;
|
||||
@ -239,12 +240,14 @@ private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
|
||||
: HistoryInfoState.IN_INTERMEDIATE;
|
||||
}
|
||||
|
||||
private synchronized boolean isMovePending() {
|
||||
@VisibleForTesting
|
||||
synchronized boolean isMovePending() {
|
||||
return state == HistoryInfoState.IN_INTERMEDIATE
|
||||
|| state == HistoryInfoState.MOVE_FAILED;
|
||||
}
|
||||
|
||||
private synchronized boolean didMoveFail() {
|
||||
@VisibleForTesting
|
||||
synchronized boolean didMoveFail() {
|
||||
return state == HistoryInfoState.MOVE_FAILED;
|
||||
}
|
||||
|
||||
@ -365,7 +368,7 @@ public synchronized Configuration loadConfFile() throws IOException {
|
||||
}
|
||||
|
||||
private SerialNumberIndex serialNumberIndex = null;
|
||||
private JobListCache jobListCache = null;
|
||||
protected JobListCache jobListCache = null;
|
||||
|
||||
// Maintains a list of known done subdirectories.
|
||||
private final Set<Path> existingDoneSubdirs = Collections
|
||||
@ -707,8 +710,8 @@ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
|
||||
* @throws IOException
|
||||
*/
|
||||
private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
|
||||
int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
|
||||
String boxedSerialNumber = String.valueOf(jobSerialNumber);
|
||||
String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
|
||||
jobId, serialNumberFormat);
|
||||
Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
|
||||
if (dateStringSet == null) {
|
||||
return null;
|
||||
|
@ -60,6 +60,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
@ -460,6 +461,51 @@ public void testCountersForFailedTask() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanningOldDirs() throws Exception {
|
||||
LOG.info("STARTING testScanningOldDirs");
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf
|
||||
.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app =
|
||||
new MRAppWithHistory(1, 1, true,
|
||||
this.getClass().getName(), true);
|
||||
app.submit(conf);
|
||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||
JobId jobId = job.getID();
|
||||
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
|
||||
// make sure all events are flushed
|
||||
app.waitForState(Service.STATE.STOPPED);
|
||||
|
||||
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
|
||||
hfm.init(conf);
|
||||
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
|
||||
Assert.assertNotNull("Unable to locate job history", fileInfo);
|
||||
|
||||
// force the manager to "forget" the job
|
||||
hfm.deleteJobFromJobListCache(fileInfo);
|
||||
final int msecPerSleep = 10;
|
||||
int msecToSleep = 10 * 1000;
|
||||
while (fileInfo.isMovePending() && msecToSleep > 0) {
|
||||
Assert.assertTrue(!fileInfo.didMoveFail());
|
||||
msecToSleep -= msecPerSleep;
|
||||
Thread.sleep(msecPerSleep);
|
||||
}
|
||||
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
|
||||
|
||||
fileInfo = hfm.getFileInfo(jobId);
|
||||
Assert.assertNotNull("Unable to locate old job history", fileInfo);
|
||||
} finally {
|
||||
LOG.info("FINISHED testScanningOldDirs");
|
||||
}
|
||||
}
|
||||
|
||||
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
|
||||
|
||||
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
|
||||
@ -500,6 +546,12 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
|
||||
}
|
||||
}
|
||||
|
||||
static class HistoryFileManagerForTest extends HistoryFileManager {
|
||||
void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
|
||||
jobListCache.delete(fileInfo);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestJobHistoryParsing t = new TestJobHistoryParsing();
|
||||
t.testHistoryParsing();
|
||||
|
Loading…
Reference in New Issue
Block a user