diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dcf701dd95..515ded4f7a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -558,6 +558,10 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4554. Job Credentials are not transmitted if security is turned off (Benoy Antony via bobby) + MAPREDUCE-4705. Fix a bug in job history lookup, which makes older jobs + inaccessible despite the presence of a valid history file. (Jason Lowe + via sseth) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 85fcdc6bd5..f7bc50609a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -130,7 +131,7 @@ public synchronized Set get(String serialPart) { } } - private static class JobListCache { + static class JobListCache { private ConcurrentSkipListMap cache; private int maxSize; private long maxAge; @@ -239,12 +240,14 @@ private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile, : HistoryInfoState.IN_INTERMEDIATE; } - private synchronized boolean isMovePending() { + @VisibleForTesting + synchronized boolean isMovePending() { return state == HistoryInfoState.IN_INTERMEDIATE || state == HistoryInfoState.MOVE_FAILED; } - private synchronized boolean didMoveFail() { + @VisibleForTesting + synchronized boolean didMoveFail() { return state == HistoryInfoState.MOVE_FAILED; } @@ -365,7 +368,7 @@ public synchronized Configuration loadConfFile() throws IOException { } private SerialNumberIndex serialNumberIndex = null; - private JobListCache jobListCache = null; + protected JobListCache jobListCache = null; // Maintains a list of known done subdirectories. private final Set existingDoneSubdirs = Collections @@ -707,8 +710,8 @@ private HistoryFileInfo getJobFileInfo(List fileStatusList, * @throws IOException */ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { - int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); - String boxedSerialNumber = String.valueOf(jobSerialNumber); + String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( + jobId, serialNumberFormat); Set dateStringSet = serialNumberIndex.get(boxedSerialNumber); if (dateStringSet == null) { return null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index f9acb1a382..e24cf05d79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -460,6 +461,51 @@ public void testCountersForFailedTask() throws Exception { } } + @Test + public void testScanningOldDirs() throws Exception { + LOG.info("STARTING testScanningOldDirs"); + try { + Configuration conf = new Configuration(); + conf + .setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = + new MRAppWithHistory(1, 1, true, + this.getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); + hfm.init(conf); + HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate job history", fileInfo); + + // force the manager to "forget" the job + hfm.deleteJobFromJobListCache(fileInfo); + final int msecPerSleep = 10; + int msecToSleep = 10 * 1000; + while (fileInfo.isMovePending() && msecToSleep > 0) { + Assert.assertTrue(!fileInfo.didMoveFail()); + msecToSleep -= msecPerSleep; + Thread.sleep(msecPerSleep); + } + Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); + + fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate old job history", fileInfo); + } finally { + LOG.info("FINISHED testScanningOldDirs"); + } + } + static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, @@ -500,6 +546,12 @@ protected void attemptLaunched(TaskAttemptId attemptID) { } } + static class HistoryFileManagerForTest extends HistoryFileManager { + void deleteJobFromJobListCache(HistoryFileInfo fileInfo) { + jobListCache.delete(fileInfo); + } + } + public static void main(String[] args) throws Exception { TestJobHistoryParsing t = new TestJobHistoryParsing(); t.testHistoryParsing();