From 30da99cbaf36aeef38a858251ce8ffa5eb657b38 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 18 Mar 2015 19:29:56 +0000 Subject: [PATCH] MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM. Contributed by Chang Li --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 2 +- .../v2/app/rm/TestRMContainerAllocator.java | 61 +++++++++++++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4a9b4c716b..a832db4244 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -458,6 +458,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-4742. Fix typo in nnbench#displayUsage. (Liang Xie via ozawa) + MAPREDUCE-6277. Job can post multiple history files if attempt loses + connection to the RM (Chang Li via jlowe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index efea674fbd..8cdcaa840b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -708,7 +708,7 @@ private List getResources() throws Exception { if (System.currentTimeMillis() - retrystartTime >= retryInterval) { LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.INTERNAL_ERROR)); + JobEventType.JOB_AM_REBOOT)); throw new YarnRuntimeException("Could not contact RM after " + retryInterval + " milliseconds."); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index eca1a4dddc..3b7a432323 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -65,6 +65,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; @@ -1609,6 +1611,7 @@ private static class MyContainerAllocator extends RMContainerAllocator { = new ArrayList(); static final List jobUpdatedNodeEvents = new ArrayList(); + static final List jobEvents = new ArrayList(); private MyResourceManager rm; private boolean isUnregistered = false; private AllocateResponse allocateResponse; @@ -1631,6 +1634,8 @@ public void handle(Event event) { taskAttemptKillEvents.add((TaskAttemptKillEvent)event); } else if (event instanceof JobUpdatedNodesEvent) { jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); + } else if (event instanceof JobEvent) { + jobEvents.add((JobEvent)event); } } }); @@ -1785,6 +1790,18 @@ protected AllocateResponse makeRemoteRequest() throws IOException, } } + private static class MyContainerAllocator2 extends MyContainerAllocator { + public MyContainerAllocator2(MyResourceManager rm, Configuration conf, + ApplicationAttemptId appAttemptId, Job job) { + super(rm, conf, appAttemptId, job); + } + @Override + protected AllocateResponse makeRemoteRequest() throws IOException, + YarnException { + throw new YarnRuntimeException("for testing"); + } + } + @Test public void testReduceScheduling() throws Exception { int totalMaps = 10; @@ -2312,6 +2329,50 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() } + @Test + public void testRMUnavailable() + throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0); + MyResourceManager rm1 = new MyResourceManager(conf); + rm1.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm1.getRMContext().getDispatcher(); + RMApp app = rm1.submitApp(1024); + dispatcher.await(); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + rm1.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator2 allocator = + new MyContainerAllocator2(rm1, conf, appAttemptId, mockJob); + allocator.jobEvents.clear(); + try { + allocator.schedule(); + Assert.fail("Should Have Exception"); + } catch (YarnRuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Could not contact RM after")); + } + dispatcher.await(); + Assert.assertEquals("Should Have 1 Job Event", 1, + allocator.jobEvents.size()); + JobEvent event = allocator.jobEvents.get(0); + Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT)); + } + @Test(timeout=60000) public void testAMRMTokenUpdate() throws Exception { LOG.info("Running testAMRMTokenUpdate");