MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko

This commit is contained in:
Jason Lowe 2018-01-24 14:44:07 -06:00
parent 55c32776b1
commit cff9edd4b5
3 changed files with 57 additions and 7 deletions

View File

@ -173,9 +173,14 @@ private Job loadJob(JobId jobId) throws RuntimeException, IOException {
HistoryFileInfo fileInfo;
fileInfo = hsManager.getFileInfo(jobId);
if (fileInfo == null) {
throw new HSFileRuntimeException("Unable to find job " + jobId);
} else if (fileInfo.isDeleted()) {
}
fileInfo.waitUntilMoved();
if (fileInfo.isDeleted()) {
throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
} else {
return fileInfo.loadJob();
@ -211,6 +216,7 @@ public Map<JobId, Job> getAllPartialJobs() {
for (HistoryFileInfo mi : hsManager.getAllFileInfo()) {
if (mi != null) {
JobId id = mi.getJobId();
mi.waitUntilMoved();
result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
}
}

View File

@ -452,6 +452,8 @@ synchronized void moveToDone() throws IOException {
} catch (Throwable t) {
LOG.error("Error while trying to move a job to done", t);
this.state = HistoryInfoState.MOVE_FAILED;
} finally {
notifyAll();
}
}
@ -485,12 +487,16 @@ public synchronized Path getHistoryFile() {
}
protected synchronized void delete() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("deleting " + historyFile + " and " + confFile);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("deleting " + historyFile + " and " + confFile);
}
state = HistoryInfoState.DELETED;
doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
} finally {
notifyAll();
}
state = HistoryInfoState.DELETED;
doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
}
public JobIndexInfo getJobIndexInfo() {
@ -517,6 +523,17 @@ private boolean isOversized() {
jobIndexInfo.getNumMaps();
return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
}
public synchronized void waitUntilMoved() {
while (isMovePending() && !didMoveFail()) {
try {
wait();
} catch (InterruptedException e) {
LOG.warn("Waiting has been interrupted");
throw new RuntimeException(e);
}
}
}
}
private SerialNumberIndex serialNumberIndex = null;
@ -956,6 +973,7 @@ private void scanIntermediateDirectory(final Path absPath) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling move to done of " +found);
}
moveToDoneExecutor.execute(new Runnable() {
@Override
public void run() {
@ -1193,5 +1211,5 @@ protected boolean deleteDir(FileStatus serialDir)
@VisibleForTesting
void setMaxHistoryAge(long newValue){
maxHistoryAge=newValue;
}
}
}

View File

@ -445,6 +445,32 @@ public void testRefreshJobRetentionSettings() throws IOException,
verify(fileInfo, timeout(20000).times(2)).delete();
}
@Test
public void testCachedStorageWaitsForFileMove() throws IOException {
HistoryFileManager historyManager = mock(HistoryFileManager.class);
jobHistory = spy(new JobHistory());
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
Configuration conf = new Configuration();
jobHistory.init(conf);
jobHistory.start();
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
.getHistoryStorage());
Job job = mock(Job.class);
JobId jobId = mock(JobId.class);
when(job.getID()).thenReturn(jobId);
when(job.getTotalMaps()).thenReturn(10);
when(job.getTotalReduces()).thenReturn(2);
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo);
when(fileInfo.loadJob()).thenReturn(job);
storage.getFullJob(jobId);
verify(fileInfo).waitUntilMoved();
}
@Test
public void testRefreshLoadedJobCacheUnSupportedOperation() {
jobHistory = spy(new JobHistory());