MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM. Contributed by Chang Li

This commit is contained in:
Jason Lowe 2015-03-18 19:29:56 +00:00
parent fdd58aa32f
commit 30da99cbaf
3 changed files with 65 additions and 1 deletions

View File

@ -458,6 +458,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-4742. Fix typo in nnbench#displayUsage. (Liang Xie via ozawa) MAPREDUCE-4742. Fix typo in nnbench#displayUsage. (Liang Xie via ozawa)
MAPREDUCE-6277. Job can post multiple history files if attempt loses
connection to the RM (Chang Li via jlowe)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -708,7 +708,7 @@ private List<Container> getResources() throws Exception {
if (System.currentTimeMillis() - retrystartTime >= retryInterval) { if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(), eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR)); JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Could not contact RM after " + throw new YarnRuntimeException("Could not contact RM after " +
retryInterval + " milliseconds."); retryInterval + " milliseconds.");
} }

View File

@ -65,6 +65,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@ -1609,6 +1611,7 @@ private static class MyContainerAllocator extends RMContainerAllocator {
= new ArrayList<TaskAttemptKillEvent>(); = new ArrayList<TaskAttemptKillEvent>();
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
= new ArrayList<JobUpdatedNodesEvent>(); = new ArrayList<JobUpdatedNodesEvent>();
static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
private MyResourceManager rm; private MyResourceManager rm;
private boolean isUnregistered = false; private boolean isUnregistered = false;
private AllocateResponse allocateResponse; private AllocateResponse allocateResponse;
@ -1631,6 +1634,8 @@ public void handle(Event event) {
taskAttemptKillEvents.add((TaskAttemptKillEvent)event); taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
} else if (event instanceof JobUpdatedNodesEvent) { } else if (event instanceof JobUpdatedNodesEvent) {
jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
} else if (event instanceof JobEvent) {
jobEvents.add((JobEvent)event);
} }
} }
}); });
@ -1785,6 +1790,18 @@ protected AllocateResponse makeRemoteRequest() throws IOException,
} }
} }
private static class MyContainerAllocator2 extends MyContainerAllocator {
public MyContainerAllocator2(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job) {
super(rm, conf, appAttemptId, job);
}
@Override
protected AllocateResponse makeRemoteRequest() throws IOException,
YarnException {
throw new YarnRuntimeException("for testing");
}
}
@Test @Test
public void testReduceScheduling() throws Exception { public void testReduceScheduling() throws Exception {
int totalMaps = 10; int totalMaps = 10;
@ -2312,6 +2329,50 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart()
} }
@Test
public void testRMUnavailable()
throws Exception {
Configuration conf = new Configuration();
conf.setInt(
MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
MyResourceManager rm1 = new MyResourceManager(conf);
rm1.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm1.getRMContext().getDispatcher();
RMApp app = rm1.submitApp(1024);
dispatcher.await();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator2 allocator =
new MyContainerAllocator2(rm1, conf, appAttemptId, mockJob);
allocator.jobEvents.clear();
try {
allocator.schedule();
Assert.fail("Should Have Exception");
} catch (YarnRuntimeException e) {
Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
}
dispatcher.await();
Assert.assertEquals("Should Have 1 Job Event", 1,
allocator.jobEvents.size());
JobEvent event = allocator.jobEvents.get(0);
Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT));
}
@Test(timeout=60000) @Test(timeout=60000)
public void testAMRMTokenUpdate() throws Exception { public void testAMRMTokenUpdate() throws Exception {
LOG.info("Running testAMRMTokenUpdate"); LOG.info("Running testAMRMTokenUpdate");