MAPREDUCE-3058. Fixed MR YarnChild to report failure when task throws an error and thus prevent a hanging task and job. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1187654 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-22 06:14:46 +00:00
parent e175574635
commit 5795fcfd99
5 changed files with 22 additions and 2 deletions

View File

@ -1729,6 +1729,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3242. Trunk compilation broken with bad interaction from
MAPREDUCE-3070 and MAPREDUCE-3239. (mahadev)
MAPREDUCE-3058. Fixed MR YarnChild to report failure when task throws an
error and thus prevent a hanging task and job. (vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -177,7 +177,7 @@ public Object run() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
if (taskid != null) {
umbilical.reportDiagnosticInfo(taskid, baos.toString());
umbilical.fatalError(taskid, baos.toString());
}
} catch (Throwable throwable) {
LOG.fatal("Error running child : "

View File

@ -30,6 +30,22 @@
public class FailingMapper extends Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value,
Context context) throws IOException,InterruptedException {
// Just create a non-daemon thread which hangs forever. MR AM should not be
// hung by this.
new Thread() {
@Override
public void run() {
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
//
}
}
}
}.start();
if (context.getTaskAttemptID().getId() == 0) {
System.out.println("Attempt:" + context.getTaskAttemptID() +
" Failing mapper throwing exception");

View File

@ -299,7 +299,6 @@ protected Job runFailingMapperJob()
throws IOException, InterruptedException, ClassNotFoundException {
Configuration myConf = new Configuration(mrCluster.getConfig());
myConf.setInt(MRJobConfig.NUM_MAPS, 1);
myConf.setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
Job job = new Job(myConf);

View File

@ -351,9 +351,11 @@ protected void setup(Context ctxt)
matcher = new ResourceUsageMatcherRunner(ctxt,
split.getMapResourceUsageMetrics());
matcher.setDaemon(true);
// start the status reporter thread
reporter = new StatusReporter(ctxt, matcher);
reporter.setDaemon(true);
reporter.start();
}