diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index ce3b3cc596..b34cd7fd88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -142,7 +143,8 @@ public abstract class TaskImpl implements Task, EventHandler { private boolean historyTaskStartGenerated = false; // Launch time reported in history events. private long launchTime; - + private boolean speculationEnabled = false; + private static final SingleArcTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); private static final SingleArcTransition @@ -325,6 +327,9 @@ public TaskImpl(JobId jobId, TaskType taskType, int partition, this.appContext = appContext; this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); + this.speculationEnabled = taskType.equals(TaskType.MAP) ? + conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) : + conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -1079,13 +1084,19 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { if (task.successfulAttempt == null) { boolean shouldAddNewAttempt = true; if (task.inProgressAttempts.size() > 0) { - // if not all of the inProgressAttempts are hanging for resource - for (TaskAttemptId attemptId : task.inProgressAttempts) { - if (((TaskAttemptImpl) task.getAttempt(attemptId)) - .isContainerAssigned()) { - shouldAddNewAttempt = false; - break; + if(task.speculationEnabled) { + // if not all of the inProgressAttempts are hanging for resource + for (TaskAttemptId attemptId : task.inProgressAttempts) { + if (((TaskAttemptImpl) task.getAttempt(attemptId)) + .isContainerAssigned()) { + shouldAddNewAttempt = false; + break; + } } + } else { + // No need to add new attempt if there are in progress attempts + // when speculation is false + shouldAddNewAttempt = false; } } if (shouldAddNewAttempt) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java index fe21f07524..8527dc3c9e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java @@ -192,6 +192,46 @@ public void reduce(Text key, Iterable values, } } + public static class FailOnceMapper extends + Mapper { + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + TaskAttemptID taid = context.getTaskAttemptID(); + try{ + Thread.sleep(2000); + } catch(InterruptedException ie) { + // Ignore + } + // Fail mapper only for first attempt + if (taid.getId() == 0) { + throw new RuntimeException("Failing this mapper"); + } + + context.write(value, new IntWritable(1)); + } + } + + public static class FailOnceReducer extends + Reducer { + + public void reduce(Text key, Iterable values, + Context context) throws IOException, InterruptedException { + TaskAttemptID taid = context.getTaskAttemptID(); + try{ + Thread.sleep(2000); + } catch(InterruptedException ie) { + // Ignore + } + // Fail reduce only for first attempt + if (taid.getId() == 0) { + throw new RuntimeException("Failing this reducer"); + } + context.write(key, new IntWritable(0)); + } + } + + @Test public void testSpeculativeExecution() throws Exception { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { @@ -218,6 +258,30 @@ public void testSpeculativeExecution() throws Exception { Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS) .getValue()); + + /*------------------------------------------------------------------ + * Test that Map/Red does not speculate if MAP_SPECULATIVE and + * REDUCE_SPECULATIVE are both false. When map tasks fail once and time out, + * we shouldn't launch two simultaneous attempts. MAPREDUCE-7278 + * ----------------------------------------------------------------- + */ + job = runNonSpecFailOnceTest(); + + succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + counters = job.getCounters(); + // We will have 4 total since 2 map tasks fail and relaunch attempt once + Assert.assertEquals(4, + counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue()); + Assert.assertEquals(4, + counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); + // Ensure no maps or reduces killed due to accidental speculation + Assert.assertEquals(0, + counters.findCounter(JobCounter.NUM_KILLED_MAPS).getValue()); + Assert.assertEquals(0, + counters.findCounter(JobCounter.NUM_KILLED_REDUCES).getValue()); + /*---------------------------------------------------------------------- * Test that Mapper speculates if MAP_SPECULATIVE is true and * REDUCE_SPECULATIVE is false. @@ -295,7 +359,48 @@ private Job runSpecTest(boolean mapspec, boolean redspec) // Delete output directory if it exists. try { - localFs.delete(TEST_OUT_DIR,true); + localFs.delete(TEST_OUT_DIR, true); + } catch (IOException e) { + // ignore + } + + // Creates the Job Configuration + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.setMaxMapAttempts(2); + + job.submit(); + + return job; + } + + private Job runNonSpecFailOnceTest() + throws IOException, ClassNotFoundException, InterruptedException { + + Path first = createTempFile("specexec_map_input1", "a\nz"); + Path secnd = createTempFile("specexec_map_input2", "a\nz"); + + Configuration conf = mrCluster.getConfig(); + conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); + conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + // Prevent blacklisting since tasks fail once + conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, false); + // Setting small task exit timeout values reproduces MAPREDUCE-7278 + conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 20); + conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); + Job job = Job.getInstance(conf); + job.setJarByClass(TestSpeculativeExecution.class); + job.setMapperClass(FailOnceMapper.class); + job.setReducerClass(FailOnceReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setNumReduceTasks(2); + FileInputFormat.setInputPaths(job, first); + FileInputFormat.addInputPath(job, secnd); + FileOutputFormat.setOutputPath(job, TEST_OUT_DIR); + + // Delete output directory if it exists. + try { + localFs.delete(TEST_OUT_DIR, true); } catch (IOException e) { // ignore }