MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly to clients in case of failed jobs also. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1383709 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-09-12 00:59:15 +00:00
parent 11cf00b08a
commit 229a79bbc3
6 changed files with 66 additions and 14 deletions

View File

@ -141,6 +141,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4607. Race condition in ReduceTask completion can result in Task
being incorrectly failed. (Bikas Saha via tomwhite)
MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly
to clients in case of failed jobs also. (Jason Lowe via vinodkv)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES

View File

@ -582,17 +582,23 @@ public JobReport getReport() {
String jobFile =
remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
StringBuilder diagsb = new StringBuilder();
for (String s : getDiagnostics()) {
diagsb.append(s).append("\n");
}
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, jobFile, amInfos, isUber);
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
}
computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username,
state, appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
cleanupProgress, jobFile, amInfos, isUber);
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
return report;
} finally {
readLock.unlock();
}

View File

@ -138,7 +138,7 @@ public void testSimple() throws Exception {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -215,7 +215,7 @@ public void testResource() throws Exception {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -281,7 +281,7 @@ public void testMapReduceScheduling() throws Exception {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -723,7 +723,7 @@ public void testBlackListedNodes() throws Exception {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -827,7 +827,7 @@ public void testIgnoreBlacklisting() throws Exception {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator =
new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
@ -993,7 +993,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -1445,7 +1445,7 @@ public void testCompletedTasksRecalculateSchedule() throws Exception {
Job job = mock(Job.class);
when(job.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
doReturn(10).when(job).getTotalMaps();
doReturn(10).when(job).getTotalReduces();
doReturn(0).when(job).getCompletedMaps();

View File

@ -45,11 +45,14 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Records;
@ -172,6 +175,8 @@ public static void main(String[] args) throws Exception {
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
t.testCheckAccess();
t.testReportDiagnostics();
t.testUberDecision();
}
@Test
@ -241,6 +246,41 @@ public void testCheckAccess() {
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@Test
public void testReportDiagnostics() throws Exception {
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
final String diagMsg = "some diagnostic message";
final JobDiagnosticsUpdateEvent diagUpdateEvent =
new JobDiagnosticsUpdateEvent(jobId, diagMsg);
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, mock(OutputCommitter.class),
true, null, 0, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
Assert.assertTrue(diagnostics.contains(diagMsg));
job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, mock(OutputCommitter.class),
true, null, 0, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
Assert.assertTrue(diagnostics.contains(diagMsg));
}
@Test
public void testUberDecision() throws Exception {

View File

@ -67,7 +67,7 @@ public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile, List<AMInfo> amInfos,
boolean isUber) {
boolean isUber, String diagnostics) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
@ -83,6 +83,7 @@ public static JobReport newJobReport(JobId jobId, String jobName,
report.setJobFile(jobFile);
report.setAMInfos(amInfos);
report.setIsUber(isUber);
report.setDiagnostics(diagnostics);
return report;
}

View File

@ -219,7 +219,8 @@ public void testReconnectOnAMRestart() throws IOException {
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
false, ""));
// First AM returns a report with jobName firstGen and simulates AM shutdown
// on second invocation.
@ -231,7 +232,8 @@ public void testReconnectOnAMRestart() throws IOException {
GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
when(jobReportResponse2.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
false, ""));
// Second AM generation returns a report with jobName secondGen
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);