MAPREDUCE-6927. MR job should only set tracking url if history was successfully written. Contributed by Eric Badger

This commit is contained in:
Jason Lowe 2017-08-08 14:46:47 -05:00
parent acf9bd8b1d
commit 735fce5bec
8 changed files with 168 additions and 10 deletions

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -1404,7 +1405,12 @@ protected void processDoneFiles(JobId jobId) throws IOException {
qualifiedDoneFile = qualifiedDoneFile =
doneDirFS.makeQualified(new Path(doneDirPrefixPath, doneDirFS.makeQualified(new Path(doneDirPrefixPath,
doneJobHistoryFileName)); doneJobHistoryFileName));
moveToDoneNow(qualifiedLogFile, qualifiedDoneFile); if(moveToDoneNow(qualifiedLogFile, qualifiedDoneFile)) {
String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(
getConfig(), context.getApplicationID());
context.setHistoryUrl(historyUrl);
LOG.info("Set historyUrl to " + historyUrl);
}
} }
// Move confFile to Done Folder // Move confFile to Done Folder
@ -1610,7 +1616,7 @@ void shutDownTimer() throws IOException {
} }
} }
private void moveTmpToDone(Path tmpPath) throws IOException { protected void moveTmpToDone(Path tmpPath) throws IOException {
if (tmpPath != null) { if (tmpPath != null) {
String tmpFileName = tmpPath.getName(); String tmpFileName = tmpPath.getName();
String fileName = getFileNameFromTmpFN(tmpFileName); String fileName = getFileNameFromTmpFN(tmpFileName);
@ -1622,7 +1628,9 @@ private void moveTmpToDone(Path tmpPath) throws IOException {
// TODO If the FS objects are the same, this should be a rename instead of a // TODO If the FS objects are the same, this should be a rename instead of a
// copy. // copy.
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException { protected boolean moveToDoneNow(Path fromPath, Path toPath)
throws IOException {
boolean success = false;
// check if path exists, in case of retries it may not exist // check if path exists, in case of retries it may not exist
if (stagingDirFS.exists(fromPath)) { if (stagingDirFS.exists(fromPath)) {
LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString()); LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
@ -1631,14 +1639,19 @@ private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath, boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
false, getConfig()); false, getConfig());
if (copied)
LOG.info("Copied to done location: " + toPath);
else
LOG.info("copy failed");
doneDirFS.setPermission(toPath, new FsPermission( doneDirFS.setPermission(toPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS)); JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
if (copied) {
LOG.info("Copied from: " + fromPath.toString()
+ " to done location: " + toPath.toString());
success = true;
} else {
LOG.info("Copy failed from: " + fromPath.toString()
+ " to done location: " + toPath.toString());
} }
} }
return success;
}
private String getTempFileName(String srcFile) { private String getTempFileName(String srcFile) {
return srcFile + "_tmp"; return srcFile + "_tmp";

View File

@ -69,4 +69,8 @@ public interface AppContext {
String getNMHostname(); String getNMHostname();
TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor(); TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor();
String getHistoryUrl();
void setHistoryUrl(String historyUrl);
} }

View File

@ -1078,6 +1078,7 @@ public class RunningAppContext implements AppContext {
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private TimelineClient timelineClient = null; private TimelineClient timelineClient = null;
private TimelineV2Client timelineV2Client = null; private TimelineV2Client timelineV2Client = null;
private String historyUrl = null;
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
@ -1197,6 +1198,16 @@ public TimelineClient getTimelineClient() {
public TimelineV2Client getTimelineV2Client() { public TimelineV2Client getTimelineV2Client() {
return timelineV2Client; return timelineV2Client;
} }
@Override
public String getHistoryUrl() {
return historyUrl;
}
@Override
public void setHistoryUrl(String historyUrl) {
this.historyUrl = historyUrl;
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -215,9 +215,7 @@ protected void doUnregistration()
} }
LOG.info("Setting job diagnostics to " + sb.toString()); LOG.info("Setting job diagnostics to " + sb.toString());
String historyUrl = String historyUrl = context.getHistoryUrl();
MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
context.getApplicationID());
LOG.info("History url is " + historyUrl); LOG.info("History url is " + historyUrl);
FinishApplicationMasterRequest request = FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState, FinishApplicationMasterRequest.newInstance(finishState,

View File

@ -21,6 +21,9 @@
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -62,6 +65,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -920,6 +924,104 @@ public void testSigTermedFunctionality() throws IOException {
jheh.lastEventHandled.getHistoryEvent() jheh.lastEventHandled.getHistoryEvent()
instanceof JobUnsuccessfulCompletionEvent); instanceof JobUnsuccessfulCompletionEvent);
} }
@Test (timeout=50000)
public void testSetTrackingURLAfterHistoryIsWritten() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes and successfully writes history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(
conf, t.mockAppContext.getApplicationID());
verify(t.mockAppContext, times(1)).setHistoryUrl(historyUrl);
} finally {
jheh.stop();
}
}
@Test (timeout=50000)
public void testDontSetTrackingURLIfHistoryWriteFailed() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
doReturn(false).when(jheh).moveToDoneNow(any(Path.class),
any(Path.class));
doNothing().when(jheh).moveTmpToDone(any(Path.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes, but doesn't successfully write history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
} finally {
jheh.stop();
}
}
@Test (timeout=50000)
public void testDontSetTrackingURLIfHistoryWriteThrows() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
doThrow(new YarnRuntimeException(new IOException()))
.when(jheh).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes, but doesn't successfully write history
try {
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
throw new RuntimeException(
"processDoneFiles didn't throw, but should have");
} catch (YarnRuntimeException yre) {
// Exception expected, do nothing
}
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
} finally {
jheh.stop();
}
}
} }
class JHEvenHandlerForTest extends JobHistoryEventHandler { class JHEvenHandlerForTest extends JobHistoryEventHandler {

View File

@ -154,4 +154,14 @@ public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null; return null;
} }
@Override
public String getHistoryUrl() {
return null;
}
@Override
public void setHistoryUrl(String historyUrl) {
return;
}
} }

View File

@ -896,5 +896,15 @@ public String getNMHostname() {
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null; return null;
} }
@Override
public String getHistoryUrl() {
return null;
}
@Override
public void setHistoryUrl(String historyUrl) {
return;
}
} }
} }

View File

@ -407,4 +407,14 @@ public String getNMHostname() {
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null; return null;
} }
@Override
public String getHistoryUrl() {
return null;
}
@Override
public void setHistoryUrl(String historyUrl) {
return;
}
} }