diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 6f9cc34e92..fed500a429 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -510,7 +510,7 @@ private void runSubtask(org.apache.hadoop.mapred.Task task, String cause = (tCause == null) ? throwable.getMessage() : StringUtils .stringifyException(tCause); - umbilical.fatalError(classicAttemptID, cause); + umbilical.fatalError(classicAttemptID, cause, false); } throw new RuntimeException(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 556c90c441..b155af220f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; @@ -281,7 +282,7 @@ public void done(TaskAttemptID taskAttemptID) throws IOException { } @Override - public void fatalError(TaskAttemptID taskAttemptID, String msg) + public void fatalError(TaskAttemptID taskAttemptID, String msg, boolean fastFail) throws IOException { // This happens only in Child and in the Task. LOG.error("Task: " + taskAttemptID + " - exited : " + msg); @@ -294,7 +295,7 @@ public void fatalError(TaskAttemptID taskAttemptID, String msg) preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID, fastFail)); } @Override @@ -312,7 +313,7 @@ public void fsError(TaskAttemptID taskAttemptID, String message) preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 7ae7a1e224..bd40e548c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -206,7 +206,7 @@ public Object run() throws Exception { if (taskid != null) { if (!ShutdownHookManager.get().isShutdownInProgress()) { umbilical.fatalError(taskid, - StringUtils.stringifyException(exception)); + StringUtils.stringifyException(exception), false); } } } catch (Throwable throwable) { @@ -218,7 +218,7 @@ public Object run() throws Exception { String cause = tCause == null ? throwable.getMessage() : StringUtils .stringifyException(tCause); - umbilical.fatalError(taskid, cause); + umbilical.fatalError(taskid, cause, false); } } } finally { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java new file mode 100644 index 0000000000..6ea1d15da3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +public class TaskAttemptFailEvent extends TaskAttemptEvent { + private boolean fastFail; + + /** + * Create a new TaskAttemptFailEvent, with task fastFail disabled. + * + * @param id the id of the task attempt + */ + public TaskAttemptFailEvent(TaskAttemptId id) { + this(id, false); + } + + /** + * Create a new TaskAttemptFailEvent. + * + * @param id the id of the task attempt + * @param fastFail should the task fastFail or not. + */ + public TaskAttemptFailEvent(TaskAttemptId id, boolean fastFail) { + super(id, TaskAttemptEventType.TA_FAILMSG); + this.fastFail = fastFail; + } + + /** + * Check if task should fast fail or retry + * @return boolean value where true indicates the task should not retry + */ + public boolean isFastFail() { + return fastFail; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java new file mode 100644 index 0000000000..30392ac0a2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +public class TaskTAttemptFailedEvent extends TaskTAttemptEvent { + + private boolean fastFail; + + public TaskTAttemptFailedEvent(TaskAttemptId id) { + this(id, false); + } + + public TaskTAttemptFailedEvent(TaskAttemptId id, boolean fastFail) { + super(id, TaskEventType.T_ATTEMPT_FAILED); + this.fastFail = fastFail; + } + + public boolean isFastFail() { + return fastFail; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 431128be71..6632f277d7 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -94,6 +94,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; @@ -101,6 +102,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; @@ -194,6 +196,7 @@ public abstract class TaskAttemptImpl implements private Locality locality; private Avataar avataar; private boolean rescheduleNextAttempt = false; + private boolean failFast = false; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -1412,6 +1415,14 @@ public Avataar getAvataar() public void setAvataar(Avataar avataar) { this.avataar = avataar; } + + public void setTaskFailFast(boolean failFast) { + this.failFast = failFast; + } + + public boolean isTaskFailFast() { + return failFast; + } @SuppressWarnings("unchecked") public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo, @@ -1921,9 +1932,12 @@ public void transition(TaskAttemptImpl taskAttempt, switch(finalState) { case FAILED: - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_FAILED)); + boolean fastFail = false; + if (event instanceof TaskAttemptFailEvent) { + fastFail = ((TaskAttemptFailEvent) event).isFastFail(); + } + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent( + taskAttempt.attemptId, fastFail)); break; case KILLED: taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( @@ -2041,13 +2055,16 @@ public void transition(TaskAttemptImpl taskAttempt, private static class FailedTransition implements SingleArcTransition { + + @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // set the finish time taskAttempt.setFinishTime(); - notifyTaskAttemptFailed(taskAttempt); + + notifyTaskAttemptFailed(taskAttempt, taskAttempt.isTaskFailFast()); } } @@ -2154,8 +2171,8 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent( + taskAttempt.attemptId)); } } @@ -2332,6 +2349,8 @@ public void transition(TaskAttemptImpl taskAttempt, if (event instanceof TaskAttemptKillEvent) { taskAttempt.setRescheduleNextAttempt( ((TaskAttemptKillEvent)event).getRescheduleAttempt()); + } else if (event instanceof TaskAttemptFailEvent) { + taskAttempt.setTaskFailFast(((TaskAttemptFailEvent)event).isFastFail()); } } } @@ -2400,12 +2419,13 @@ public void transition(TaskAttemptImpl taskAttempt, // register it to finishing state taskAttempt.appContext.getTaskAttemptFinishingMonitor().register( taskAttempt.attemptId); - notifyTaskAttemptFailed(taskAttempt); + notifyTaskAttemptFailed(taskAttempt, false); } } @SuppressWarnings("unchecked") - private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { + private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt, + boolean fastFail) { if (taskAttempt.getLaunchTime() == 0) { sendJHStartEventForAssignedFailTask(taskAttempt); } @@ -2419,8 +2439,8 @@ private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent( + taskAttempt.attemptId, fastFail)); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 086d4d5589..ce3b3cc596 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; @@ -1054,7 +1055,7 @@ private static class AttemptFailedTransition implements @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + TaskTAttemptFailedEvent castEvent = (TaskTAttemptFailedEvent) event; TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID(); task.failedAttempts.add(taskAttemptId); if (taskAttemptId.equals(task.commitAttempt)) { @@ -1068,7 +1069,8 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { } task.finishedAttempts.add(taskAttemptId); - if (task.failedAttempts.size() < task.maxAttempts) { + if (!castEvent.isFastFail() + && task.failedAttempts.size() < task.maxAttempts) { task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.FAILED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 4d3f6f4544..a2f0abaaa3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Map; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -288,8 +289,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0) {//check if it is first task // send the Fail event getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, @@ -310,8 +310,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) { //check if it is first task's first attempt // send the Fail event getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 893c4a0733..b2807c1f4e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -38,6 +38,8 @@ import java.util.Map; import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -167,9 +169,8 @@ public void testCrashed() throws Exception { /////////// Play some games with the TaskAttempts of the first task ////// //send the fail signal to the 1st map task attempt app.getContext().getEventHandler().handle( - new TaskAttemptEvent( - task1Attempt1.getID(), - TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent( + task1Attempt1.getID())); app.waitForState(task1Attempt1, TaskAttemptState.FAILED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 1827ce4d51..8592b20dad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -81,7 +81,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; @@ -437,8 +437,7 @@ public void testFailAbortDoesntHang() throws IOException { TaskImpl task = (TaskImpl) t; task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE)); for(TaskAttempt ta: task.getAttempts().values()) { - task.handle(new TaskTAttemptEvent(ta.getID(), - TaskEventType.T_ATTEMPT_FAILED)); + task.handle(new TaskTAttemptFailedEvent(ta.getID())); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index fe5d95dc25..43571a9b82 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; @@ -499,7 +500,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) { new TaskAttemptDiagnosticsUpdateEvent(attemptID, "Test Diagnostic Event")); getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } protected EventHandler createJobHistoryHandler( @@ -1357,8 +1358,7 @@ public void testKillMapTaskWhileFailFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); - taImpl.handle(new TaskAttemptEvent(taImpl.getID(), - TaskAttemptEventType.TA_FAILMSG)); + taImpl.handle(new TaskAttemptFailEvent(taImpl.getID())); assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); @@ -1484,8 +1484,7 @@ public void testTimeoutWhileFailFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); - taImpl.handle(new TaskAttemptEvent(taImpl.getID(), - TaskAttemptEventType.TA_FAILMSG)); + taImpl.handle(new TaskAttemptFailEvent(taImpl.getID())); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.FAILED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 62d4cc03a5..1225c4308c 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; @@ -345,8 +346,7 @@ private void killRunningTaskAttempt(TaskAttemptId attemptId, } private void failRunningTaskAttempt(TaskAttemptId attemptId) { - mockTask.handle(new TaskTAttemptEvent(attemptId, - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(attemptId)); assertTaskRunningState(); } @@ -612,11 +612,16 @@ private void runSpeculativeTaskAttemptSucceeds( // The task should now have succeeded assertTaskSucceededState(); - + // Now complete the first task attempt, after the second has succeeded - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), - firstAttemptFinishEvent)); - + if (firstAttemptFinishEvent.equals(TaskEventType.T_ATTEMPT_FAILED)) { + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempts + .get(0).getAttemptId())); + } else { + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), + firstAttemptFinishEvent)); + } + // The task should still be in the succeeded state assertTaskSucceededState(); @@ -668,8 +673,8 @@ public void testSpeculativeMapFetchFailure() { assertEquals(2, taskAttempts.size()); // speculative attempt retroactively fails from fetch failures - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempts.get(1).getAttemptId())); assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); @@ -683,8 +688,8 @@ public void testSpeculativeMapMultipleSucceedFetchFailure() { assertEquals(2, taskAttempts.size()); // speculative attempt retroactively fails from fetch failures - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempts.get(1).getAttemptId())); assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); @@ -698,8 +703,8 @@ public void testSpeculativeMapFailedFetchFailure() { assertEquals(2, taskAttempts.size()); // speculative attempt retroactively fails from fetch failures - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempts.get(1).getAttemptId())); assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); @@ -734,8 +739,8 @@ protected int getMaxAttempts() { // have the first attempt fail, verify task failed due to no retries MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); taskAttempt.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempt.getAttemptId())); assertEquals(TaskState.FAILED, mockTask.getState()); // verify task can no longer be killed @@ -757,8 +762,7 @@ protected int getMaxAttempts() { TaskEventType.T_ATTEMPT_COMMIT_PENDING)); assertEquals(TaskState.FAILED, mockTask.getState()); taskAttempt.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId())); assertEquals(TaskState.FAILED, mockTask.getState()); taskAttempt = taskAttempts.get(2); taskAttempt.setState(TaskAttemptState.SUCCEEDED); @@ -808,8 +812,7 @@ protected int getMaxAttempts() { // max attempts is 4 MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); taskAttempt.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId())); assertEquals(TaskState.RUNNING, mockTask.getState()); // verify a new attempt(#3) added because the speculative attempt(#2) @@ -829,8 +832,7 @@ protected int getMaxAttempts() { // hasn't reach the max attempts which is 4 MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1); taskAttempt1.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt1.getAttemptId())); assertEquals(TaskState.RUNNING, mockTask.getState()); // verify there's no new attempt added because of the running attempt(#3) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index c9dff6a01d..5e7a2500dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -729,9 +729,9 @@ public void shuffleError(TaskAttemptID taskId, String message) throws IOExceptio LOG.error("shuffleError: "+ message + "from task: " + taskId); } - public synchronized void fatalError(TaskAttemptID taskId, String msg) + public synchronized void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException { - LOG.error("Fatal: "+ msg + "from task: " + taskId); + LOG.error("Fatal: "+ msg + " from task: " + taskId + " fast fail: " + fastFail); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 27c89763f8..ab7cba5a39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -1568,7 +1568,8 @@ private void checkSpillException() throws IOException { if (lspillException instanceof Error) { final String logMsg = "Task " + getTaskID() + " failed : " + StringUtils.stringifyException(lspillException); - mapTask.reportFatalError(getTaskID(), lspillException, logMsg); + mapTask.reportFatalError(getTaskID(), lspillException, logMsg, + false); } throw new IOException("Spill failed", lspillException); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 730f4ee90f..87c9e161d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -354,7 +355,7 @@ protected void setWriteSkipRecs(boolean writeSkipRecs) { * Report a fatal error to the parent (task) tracker. */ protected void reportFatalError(TaskAttemptID id, Throwable throwable, - String logMsg) { + String logMsg, boolean fastFail) { LOG.error(logMsg); if (ShutdownHookManager.get().isShutdownInProgress()) { @@ -366,7 +367,7 @@ protected void reportFatalError(TaskAttemptID id, Throwable throwable, ? StringUtils.stringifyException(throwable) : StringUtils.stringifyException(tCause); try { - umbilical.fatalError(id, cause); + umbilical.fatalError(id, cause, fastFail); } catch (IOException ioe) { LOG.error("Failed to contact the tasktracker", ioe); System.exit(-1); @@ -652,6 +653,8 @@ public class TaskReporter private Thread pingThread = null; private boolean done = true; private Object lock = new Object(); + private volatile String diskLimitCheckStatus = null; + private Thread diskLimitCheckThread = null; /** * flag that indicates whether progress update needs to be sent to parent. @@ -748,6 +751,65 @@ public TaskLimitException(String str) { } } + /** + * disk limit checker, runs in separate thread when activated. + */ + public class DiskLimitCheck implements Runnable { + private LocalFileSystem localFS; + private long fsLimit; + private long checkInterval; + private String[] localDirs; + private boolean killOnLimitExceeded; + + public DiskLimitCheck(JobConf conf) throws IOException { + this.localFS = FileSystem.getLocal(conf); + this.fsLimit = conf.getLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES); + this.localDirs = conf.getLocalDirs(); + this.checkInterval = conf.getLong( + MRJobConfig.JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS); + this.killOnLimitExceeded = conf.getBoolean( + MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED); + } + + @Override + public void run() { + while (!taskDone.get()) { + try { + long localWritesSize = 0L; + String largestWorkDir = null; + for (String local : localDirs) { + long size = FileUtil.getDU(localFS.pathToFile(new Path(local))); + if (localWritesSize < size) { + localWritesSize = size; + largestWorkDir = local; + } + } + if (localWritesSize > fsLimit) { + String localStatus = + "too much data in local scratch dir=" + + largestWorkDir + + ". current size is " + + localWritesSize + + " the limit is " + fsLimit; + if (killOnLimitExceeded) { + LOG.error(localStatus); + diskLimitCheckStatus = localStatus; + } else { + LOG.warn(localStatus); + } + break; + } + Thread.sleep(checkInterval); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + } + /** * check the counters to see whether the task has exceeded any configured * limits. @@ -773,6 +835,9 @@ protected void checkTaskLimits() throws TaskLimitException { " the limit is " + limit); } } + if (diskLimitCheckStatus != null) { + throw new TaskLimitException(diskLimitCheckStatus); + } } /** @@ -851,7 +916,7 @@ public void run() { StringUtils.stringifyException(e); LOG.error(errMsg); try { - umbilical.fatalError(taskId, errMsg); + umbilical.fatalError(taskId, errMsg, true); } catch (IOException ioe) { LOG.error("Failed to update failure diagnosis", ioe); } @@ -884,6 +949,22 @@ public void startCommunicationThread() { pingThread.setDaemon(true); pingThread.start(); } + startDiskLimitCheckerThreadIfNeeded(); + } + public void startDiskLimitCheckerThreadIfNeeded() { + if (diskLimitCheckThread == null && conf.getLong( + MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES) >= 0) { + try { + diskLimitCheckThread = new Thread(new DiskLimitCheck(conf), + "disk limit check thread"); + diskLimitCheckThread.setDaemon(true); + diskLimitCheckThread.start(); + } catch (IOException e) { + LOG.error("Issues starting disk monitor thread: " + + e.getMessage(), e); + } + } } public void stopCommunicationThread() throws InterruptedException { if (pingThread != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java index c3678d6706..041ab39080 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java @@ -68,9 +68,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516 * Version 19 Added fatalError for child to communicate fatal errors to TT * Version 20 Added methods to manage checkpoints + * Version 21 Added fastFail parameter to fatalError * */ - public static final long versionID = 20L; + public static final long versionID = 21L; /** * Called when a child task process starts, to get its task. @@ -140,8 +141,13 @@ void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) /** Report that the task encounted a local filesystem error.*/ void fsError(TaskAttemptID taskId, String message) throws IOException; - /** Report that the task encounted a fatal error.*/ - void fatalError(TaskAttemptID taskId, String message) throws IOException; + /** + * Report that the task encounted a fatal error. + * @param taskId task's id + * @param message fail message + * @param fastFail flag to enable fast fail for task + */ + void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException; /** Called by a reduce task to get the map output locations for finished maps. * Returns an update centered around the map-task-completion-events. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 6acf1bc8dd..ca18bfe5a2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -52,6 +52,20 @@ public interface MRJobConfig { public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed"; + public static final String JOB_SINGLE_DISK_LIMIT_BYTES = + "mapreduce.job.local-fs.single-disk-limit.bytes"; + // negative values disable the limit + public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1; + + public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = + "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed"; + // setting to false only logs the kill + public static final boolean DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = true; + + public static final String JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = + "mapreduce.job.local-fs.single-disk-limit.check.interval-ms"; + public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = 5000; + public static final String TASK_LOCAL_WRITE_LIMIT_BYTES = "mapreduce.task.local-fs.write-limit.bytes"; // negative values disable the limit diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 62f3dfa127..72f509c1d8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -62,6 +62,28 @@ set to less than .5 + + mapreduce.job.local-fs.single-disk-limit.bytes + -1 + Enable an in task monitor thread to watch for single disk + consumption by jobs. By setting this to x nr of bytes, the task will fast + fail in case it is reached. This is a per disk configuration. + + + + mapreduce.job.local-fs.single-disk-limit.check.interval-ms + 5000 + Interval of disk limit check to run in ms. + + + + mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed + true + If mapreduce.job.local-fs.single-disk-limit.bytes is triggered + should the task be killed or logged. If false the intent to kill the task + is only logged in the container logs. + + mapreduce.job.maps 2 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java index 18442d64e5..e5ff64e317 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java @@ -18,15 +18,19 @@ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import java.util.Random; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapred.SortedRanges.Range; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.util.ExitUtil; @@ -43,6 +47,11 @@ public class TestTaskProgressReporter { private FakeUmbilical fakeUmbilical = new FakeUmbilical(); + private static final String TEST_DIR = + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")) + "/" + + TestTaskProgressReporter.class.getName(); + private static class DummyTask extends Task { @Override public void run(JobConf job, TaskUmbilicalProtocol umbilical) @@ -53,6 +62,11 @@ public void run(JobConf job, TaskUmbilicalProtocol umbilical) public boolean isMapTask() { return true; } + + @Override + public boolean isCommitRequired() { + return false; + } } private static class FakeUmbilical implements TaskUmbilicalProtocol { @@ -118,7 +132,7 @@ public void fsError(TaskAttemptID taskId, String message) } @Override - public void fatalError(TaskAttemptID taskId, String message) + public void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException { } @@ -163,6 +177,78 @@ protected void checkTaskLimits() throws TaskLimitException { } } + @Test(timeout=60000) + public void testScratchDirSize() throws Exception { + String tmpPath = TEST_DIR + "/testBytesWrittenLimit-tmpFile-" + + new Random(System.currentTimeMillis()).nextInt(); + File data = new File(tmpPath + "/out"); + File testDir = new File(tmpPath); + testDir.mkdirs(); + testDir.deleteOnExit(); + JobConf conf = new JobConf(); + conf.setStrings(MRConfig.LOCAL_DIR, "file://" + tmpPath); + conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, 1024L); + conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + true); + getBaseConfAndWriteToFile(-1, data); + testScratchDirLimit(false, conf); + data.delete(); + getBaseConfAndWriteToFile(100, data); + testScratchDirLimit(false, conf); + data.delete(); + getBaseConfAndWriteToFile(1536, data); + testScratchDirLimit(true, conf); + conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + false); + testScratchDirLimit(false, conf); + conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + true); + conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, -1L); + testScratchDirLimit(false, conf); + data.delete(); + FileUtil.fullyDelete(testDir); + } + + private void getBaseConfAndWriteToFile(int size, File data) + throws IOException { + if (size > 0) { + byte[] b = new byte[size]; + for (int i = 0; i < size; i++) { + b[i] = 1; + } + FileUtils.writeByteArrayToFile(data, b); + } + } + + public void testScratchDirLimit(boolean fastFail, JobConf conf) + throws Exception { + ExitUtil.disableSystemExit(); + threadExited = false; + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + if (ex instanceof ExitUtil.ExitException) { + threadExited = true; + th.interrupt(); + } + } + }; + Task task = new DummyTask(); + task.setConf(conf); + DummyTaskReporter reporter = new DummyTaskReporter(task); + reporter.startDiskLimitCheckerThreadIfNeeded(); + Thread t = new Thread(reporter); + t.setUncaughtExceptionHandler(h); + reporter.setProgressFlag(); + t.start(); + while (!reporter.taskLimitIsChecked) { + Thread.yield(); + } + task.done(fakeUmbilical, reporter); + reporter.resetDoneFlag(); + t.join(1000L); + Assert.assertEquals(fastFail, threadExited); + } + @Test (timeout=10000) public void testTaskProgress() throws Exception { JobConf job = new JobConf(); @@ -214,7 +300,7 @@ public void uncaughtException(Thread th, Throwable ex) { conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0); conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit); LocalFileSystem localFS = FileSystem.getLocal(conf); - Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-" + Path tmpPath = new Path(TEST_DIR + "/testBytesWrittenLimit-tmpFile-" + new Random(System.currentTimeMillis()).nextInt()); FSDataOutputStream out = localFS.create(tmpPath, true); out.write(new byte[LOCAL_BYTES_WRITTEN]); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 83e35fecca..7b70f980b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -36,6 +36,7 @@ import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -712,7 +713,7 @@ public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); @@ -732,7 +733,7 @@ public MRAppWithHistoryWithFailedTask(int maps, int reduces, protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); @@ -760,10 +761,10 @@ protected void attemptLaunched(TaskAttemptId attemptID) { new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL)); } else if (taskType == TaskType.MAP && taskId == 1) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else if (taskType == TaskType.REDUCE && taskId == 0) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else if (taskType == TaskType.REDUCE && taskId == 1) { getContext().getEventHandler().handle( new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java index f364c18c0d..9b6ebda593 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java @@ -91,8 +91,8 @@ public void shuffleError(TaskAttemptID taskId, String message) throws IOExceptio LOG.info("Task " + taskId + " reporting shuffle error: " + message); } - public void fatalError(TaskAttemptID taskId, String msg) throws IOException { - LOG.info("Task " + taskId + " reporting fatal error: " + msg); + public void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException { + LOG.info("Task " + taskId + " reporting fatal error: " + msg + " fast fail: " + fastFail); } public JvmTask getTask(JvmContext context) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java index bed545e36c..a534cfaff0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java @@ -124,7 +124,7 @@ public void done(TaskAttemptID taskid) throws IOException { } @Override - public void fatalError(TaskAttemptID taskId, String message) + public void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException { } @Override