MAPREDUCE-3484. Fixed JobEndNotifier to not get interrupted before completing all its retries. Contributed by Ravi Prakash.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214563 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3758710bff
commit
cfbde6ac0f
@ -302,6 +302,9 @@ Release 0.23.1 - Unreleased
|
|||||||
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
|
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
|
||||||
ResourceManager. (Arun C Murthy via vinodkv)
|
ResourceManager. (Arun C Murthy via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3484. Fixed JobEndNotifier to not get interrupted before completing
|
||||||
|
all its retries. (Ravi Prakash via vinodkv)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -375,6 +375,16 @@ public void handle(JobFinishEvent event) {
|
|||||||
// this is the only job, so shut down the Appmaster
|
// this is the only job, so shut down the Appmaster
|
||||||
// note in a workflow scenario, this may lead to creation of a new
|
// note in a workflow scenario, this may lead to creation of a new
|
||||||
// job (FIXME?)
|
// job (FIXME?)
|
||||||
|
try {
|
||||||
|
LOG.info("Job end notification started for jobID : "
|
||||||
|
+ job.getReport().getJobId());
|
||||||
|
JobEndNotifier notifier = new JobEndNotifier();
|
||||||
|
notifier.setConf(getConfig());
|
||||||
|
notifier.notify(job.getReport());
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Job end notification interrupted for jobID : "
|
||||||
|
+ job.getReport().getJobId(), ie );
|
||||||
|
}
|
||||||
|
|
||||||
// TODO:currently just wait for some time so clients can know the
|
// TODO:currently just wait for some time so clients can know the
|
||||||
// final states. Will be removed once RM come on.
|
// final states. Will be removed once RM come on.
|
||||||
@ -390,16 +400,6 @@ public void handle(JobFinishEvent event) {
|
|||||||
stop();
|
stop();
|
||||||
|
|
||||||
// Send job-end notification
|
// Send job-end notification
|
||||||
try {
|
|
||||||
LOG.info("Job end notification started for jobID : "
|
|
||||||
+ job.getReport().getJobId());
|
|
||||||
JobEndNotifier notifier = new JobEndNotifier();
|
|
||||||
notifier.setConf(getConfig());
|
|
||||||
notifier.notify(job.getReport());
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
LOG.warn("Job end notification interrupted for jobID : "
|
|
||||||
+ job.getReport().getJobId(), ie );
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Graceful stop failed ", t);
|
LOG.warn("Graceful stop failed ", t);
|
||||||
}
|
}
|
||||||
|
@ -96,13 +96,20 @@ public void testNotifyRetries() throws InterruptedException {
|
|||||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
|
||||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
|
||||||
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
|
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3");
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3");
|
||||||
JobReport jobReport = Mockito.mock(JobReport.class);
|
JobReport jobReport = Mockito.mock(JobReport.class);
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
this.notificationCount = 0;
|
this.notificationCount = 0;
|
||||||
this.setConf(conf);
|
this.setConf(conf);
|
||||||
this.notify(jobReport);
|
this.notify(jobReport);
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
Assert.assertEquals("Only 3 retries were expected but was : "
|
Assert.assertEquals("Only 3 retries were expected but was : "
|
||||||
+ this.notificationCount, this.notificationCount, 3);
|
+ this.notificationCount, this.notificationCount, 3);
|
||||||
|
Assert.assertTrue("Should have taken more than 9 seconds it took "
|
||||||
|
+ (endTime - startTime), endTime - startTime > 9000);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user