From 022f7b4a25c73b8c43985e8d1bac717b96373ac6 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 19 Oct 2012 05:57:57 +0000 Subject: [PATCH] MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. Contributed by Siddarth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1399976 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/JobStateInternal.java | 30 ++ .../v2/app/job/TaskAttemptStateInternal.java | 42 +++ .../v2/app/job/TaskStateInternal.java | 23 ++ .../mapreduce/v2/app/job/impl/JobImpl.java | 204 +++++++------ .../v2/app/job/impl/TaskAttemptImpl.java | 289 ++++++++++-------- .../mapreduce/v2/app/job/impl/TaskImpl.java | 165 +++++----- .../mapreduce/v2/app/rm/RMCommunicator.java | 14 +- .../v2/app/speculate/DefaultSpeculator.java | 2 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 31 +- .../hadoop/mapreduce/v2/app/TestFail.java | 5 +- .../v2/app/TestRMContainerAllocator.java | 11 +- .../v2/app/job/impl/TestJobImpl.java | 26 +- .../v2/app/job/impl/TestTaskImpl.java | 5 +- .../app/launcher/TestContainerLauncher.java | 5 +- .../hadoop/mapreduce/TypeConverter.java | 26 +- .../mapreduce/v2/api/records/JobState.java | 1 - .../v2/api/records/TaskAttemptState.java | 14 +- .../mapreduce/v2/api/records/TaskState.java | 2 +- .../hadoop/mapreduce/v2/util/MRApps.java | 11 +- .../src/main/proto/mr_protos.proto | 26 +- 21 files changed, 570 insertions(+), 365 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7c73fe4801..93bb8f55c5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -570,6 +570,9 @@ Release 0.23.5 - UNRELEASED IMPROVEMENTS + MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol + for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java new file mode 100644 index 0000000000..476783089b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job; + +public enum JobStateInternal { + NEW, + INITED, + RUNNING, + SUCCEEDED, + FAILED, + KILL_WAIT, + KILLED, + ERROR +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java new file mode 100644 index 0000000000..f6c3e57244 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java @@ -0,0 +1,42 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job; + +import org.apache.hadoop.classification.InterfaceAudience.Private; + +/** +* TaskAttemptImpl internal state machine states. +* +*/ +@Private +public enum TaskAttemptStateInternal { + NEW, + UNASSIGNED, + ASSIGNED, + RUNNING, + COMMIT_PENDING, + SUCCESS_CONTAINER_CLEANUP, + SUCCEEDED, + FAIL_CONTAINER_CLEANUP, + FAIL_TASK_CLEANUP, + FAILED, + KILL_CONTAINER_CLEANUP, + KILL_TASK_CLEANUP, + KILLED, +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java new file mode 100644 index 0000000000..ee3c8c3327 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job; + +public enum TaskStateInternal { + NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 3de74194ea..2fd9757ff6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; @@ -210,163 +211,163 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, new UpdatedNodesTransition(); protected static final - StateMachineFactory + StateMachineFactory stateMachineFactory - = new StateMachineFactory - (JobState.NEW) + = new StateMachineFactory + (JobStateInternal.NEW) // Transitions from NEW state - .addTransition(JobState.NEW, JobState.NEW, + .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.NEW, JobState.NEW, + .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition - (JobState.NEW, - EnumSet.of(JobState.INITED, JobState.FAILED), + (JobStateInternal.NEW, + EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), JobEventType.JOB_INIT, new InitTransition()) - .addTransition(JobState.NEW, JobState.KILLED, + .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED, JobEventType.JOB_KILL, new KillNewJobTransition()) - .addTransition(JobState.NEW, JobState.ERROR, + .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(JobState.NEW, JobState.NEW, + .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_UPDATED_NODES) // Transitions from INITED state - .addTransition(JobState.INITED, JobState.INITED, + .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.INITED, JobState.INITED, + .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) - .addTransition(JobState.INITED, JobState.RUNNING, + .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING, JobEventType.JOB_START, new StartTransition()) - .addTransition(JobState.INITED, JobState.KILLED, + .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED, JobEventType.JOB_KILL, new KillInitedJobTransition()) - .addTransition(JobState.INITED, JobState.ERROR, + .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(JobState.INITED, JobState.INITED, + .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_UPDATED_NODES) // Transitions from RUNNING state - .addTransition(JobState.RUNNING, JobState.RUNNING, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) .addTransition - (JobState.RUNNING, - EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED), + (JobStateInternal.RUNNING, + EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED), JobEventType.JOB_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition - (JobState.RUNNING, - EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED), + (JobStateInternal.RUNNING, + EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED), JobEventType.JOB_COMPLETED, new JobNoTasksCompletedTransition()) - .addTransition(JobState.RUNNING, JobState.KILL_WAIT, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT, JobEventType.JOB_KILL, new KillTasksTransition()) - .addTransition(JobState.RUNNING, JobState.RUNNING, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, JobEventType.JOB_UPDATED_NODES, UPDATED_NODES_TRANSITION) - .addTransition(JobState.RUNNING, JobState.RUNNING, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, JobEventType.JOB_MAP_TASK_RESCHEDULED, new MapTaskRescheduledTransition()) - .addTransition(JobState.RUNNING, JobState.RUNNING, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.RUNNING, JobState.RUNNING, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) - .addTransition(JobState.RUNNING, JobState.RUNNING, + .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, new TaskAttemptFetchFailureTransition()) .addTransition( - JobState.RUNNING, - JobState.ERROR, JobEventType.INTERNAL_ERROR, + JobStateInternal.RUNNING, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Transitions from KILL_WAIT state. .addTransition - (JobState.KILL_WAIT, - EnumSet.of(JobState.KILL_WAIT, JobState.KILLED), + (JobStateInternal.KILL_WAIT, + EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED), JobEventType.JOB_TASK_COMPLETED, new KillWaitTaskCompletedTransition()) - .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, + .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) - .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, + .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, + .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition( - JobState.KILL_WAIT, - JobState.ERROR, JobEventType.INTERNAL_ERROR, + JobStateInternal.KILL_WAIT, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, + .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from SUCCEEDED state - .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, + .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, + .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition( - JobState.SUCCEEDED, - JobState.ERROR, JobEventType.INTERNAL_ERROR, + JobStateInternal.SUCCEEDED, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, + .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from FAILED state - .addTransition(JobState.FAILED, JobState.FAILED, + .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.FAILED, JobState.FAILED, + .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition( - JobState.FAILED, - JobState.ERROR, JobEventType.INTERNAL_ERROR, + JobStateInternal.FAILED, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(JobState.FAILED, JobState.FAILED, + .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from KILLED state - .addTransition(JobState.KILLED, JobState.KILLED, + .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobState.KILLED, JobState.KILLED, + .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition( - JobState.KILLED, - JobState.ERROR, JobEventType.INTERNAL_ERROR, + JobStateInternal.KILLED, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(JobState.KILLED, JobState.KILLED, + .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // No transitions from INTERNAL_ERROR state. Ignore all. .addTransition( - JobState.ERROR, - JobState.ERROR, + JobStateInternal.ERROR, + JobStateInternal.ERROR, EnumSet.of(JobEventType.JOB_INIT, JobEventType.JOB_KILL, JobEventType.JOB_TASK_COMPLETED, @@ -376,12 +377,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.INTERNAL_ERROR)) - .addTransition(JobState.ERROR, JobState.ERROR, + .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) // create the topology tables .installTopology(); - private final StateMachine stateMachine; + private final StateMachine stateMachine; //changing fields while the job is running private int numMapTasks; @@ -446,7 +447,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, stateMachine = stateMachineFactory.make(this); } - protected StateMachine getStateMachine() { + protected StateMachine getStateMachine() { return stateMachine; } @@ -520,9 +521,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, readLock.lock(); try { - JobState state = getState(); - if (state == JobState.ERROR || state == JobState.FAILED - || state == JobState.KILLED || state == JobState.SUCCEEDED) { + JobStateInternal state = getInternalState(); + if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED + || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) { this.mayBeConstructFinalFullCounters(); return fullCounters; } @@ -587,7 +588,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, diagsb.append(s).append("\n"); } - if (getState() == JobState.NEW) { + if (getInternalState() == JobStateInternal.NEW) { return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); @@ -674,7 +675,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public JobState getState() { readLock.lock(); try { - return getStateMachine().getCurrentState(); + return getExternalState(getStateMachine().getCurrentState()); } finally { readLock.unlock(); } @@ -695,7 +696,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); try { writeLock.lock(); - JobState oldState = getState(); + JobStateInternal oldState = getInternalState(); try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { @@ -706,9 +707,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.INTERNAL_ERROR)); } //notify the eventhandler of state change - if (oldState != getState()) { + if (oldState != getInternalState()) { LOG.info(jobId + "Job Transitioned from " + oldState + " to " - + getState()); + + getInternalState()); } } @@ -717,6 +718,25 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } + @Private + public JobStateInternal getInternalState() { + readLock.lock(); + try { + return getStateMachine().getCurrentState(); + } finally { + readLock.unlock(); + } + } + + private static JobState getExternalState(JobStateInternal smState) { + if (smState == JobStateInternal.KILL_WAIT) { + return JobState.KILLED; + } else { + return JobState.valueOf(smState.name()); + } + } + + //helpful in testing protected void addTask(Task task) { synchronized (tasksSyncHandle) { @@ -757,7 +777,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, return FileSystem.get(conf); } - static JobState checkJobCompleteSuccess(JobImpl job) { + static JobStateInternal checkJobCompleteSuccess(JobImpl job) { // check for Job success if (job.completedTaskCount == job.tasks.size()) { try { @@ -767,16 +787,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, LOG.error("Could not do commit for Job", e); job.addDiagnostic("Job commit failed: " + e.getMessage()); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); - return job.finished(JobState.FAILED); + return job.finished(JobStateInternal.FAILED); } job.logJobHistoryFinishedEvent(); - return job.finished(JobState.SUCCEEDED); + return job.finished(JobStateInternal.SUCCEEDED); } return null; } - JobState finished(JobState finalState) { - if (getState() == JobState.RUNNING) { + JobStateInternal finished(JobStateInternal finalState) { + if (getInternalState() == JobStateInternal.RUNNING) { metrics.endRunningJob(this); } if (finishTime == 0) setFinishTime(); @@ -989,7 +1009,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, */ public static class InitTransition - implements MultipleArcTransition { + implements MultipleArcTransition { /** * Note that this transition method is called directly (and synchronously) @@ -999,7 +1019,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, * way; MR version is). */ @Override - public JobState transition(JobImpl job, JobEvent event) { + public JobStateInternal transition(JobImpl job, JobEvent event) { job.metrics.submittedJob(job); job.metrics.preparingJob(job); try { @@ -1065,7 +1085,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, createReduceTasks(job); job.metrics.endPreparingJob(job); - return JobState.INITED; + return JobStateInternal.INITED; //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition) } catch (IOException e) { @@ -1074,7 +1094,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, + StringUtils.stringifyException(e)); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); job.metrics.endPreparingJob(job); - return job.finished(JobState.FAILED); + return job.finished(JobStateInternal.FAILED); } } @@ -1282,9 +1302,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - JobState.KILLED.toString()); + JobStateInternal.KILLED.toString()); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); - job.finished(JobState.KILLED); + job.finished(JobStateInternal.KILLED); } } @@ -1294,7 +1314,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public void transition(JobImpl job, JobEvent event) { job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); job.addDiagnostic("Job received Kill in INITED state."); - job.finished(JobState.KILLED); + job.finished(JobStateInternal.KILLED); } } @@ -1394,10 +1414,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } private static class TaskCompletedTransition implements - MultipleArcTransition { + MultipleArcTransition { @Override - public JobState transition(JobImpl job, JobEvent event) { + public JobStateInternal transition(JobImpl job, JobEvent event) { job.completedTaskCount++; LOG.info("Num completed Tasks: " + job.completedTaskCount); JobTaskEvent taskEvent = (JobTaskEvent) event; @@ -1413,7 +1433,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, return checkJobForCompletion(job); } - protected JobState checkJobForCompletion(JobImpl job) { + protected JobStateInternal checkJobForCompletion(JobImpl job) { //check for Job failure if (job.failedMapTaskCount*100 > job.allowedMapFailuresPercent*job.numMapTasks || @@ -1427,16 +1447,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); - return job.finished(JobState.FAILED); + return job.finished(JobStateInternal.FAILED); } - JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job); + JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job); if (jobCompleteSuccess != null) { return jobCompleteSuccess; } //return the current state, Job not finished yet - return job.getState(); + return job.getInternalState(); } private void taskSucceeded(JobImpl job, Task task) { @@ -1470,17 +1490,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // Transition class for handling jobs with no tasks static class JobNoTasksCompletedTransition implements - MultipleArcTransition { + MultipleArcTransition { @Override - public JobState transition(JobImpl job, JobEvent event) { - JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job); + public JobStateInternal transition(JobImpl job, JobEvent event) { + JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job); if (jobCompleteSuccess != null) { return jobCompleteSuccess; } // Return the current state, Job not finished yet - return job.getState(); + return job.getInternalState(); } } @@ -1497,14 +1517,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private static class KillWaitTaskCompletedTransition extends TaskCompletedTransition { @Override - protected JobState checkJobForCompletion(JobImpl job) { + protected JobStateInternal checkJobForCompletion(JobImpl job) { if (job.completedTaskCount == job.tasks.size()) { job.setFinishTime(); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); - return job.finished(JobState.KILLED); + return job.finished(JobStateInternal.KILLED); } //return the current state, Job not finished yet - return job.getState(); + return job.getInternalState(); } } @@ -1558,9 +1578,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - JobState.ERROR.toString()); + JobStateInternal.ERROR.toString()); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); - job.finished(JobState.ERROR); + job.finished(JobStateInternal.ERROR); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 6b4709c06b..687edc379c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -39,6 +39,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -72,10 +73,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -88,7 +89,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; @@ -132,6 +132,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.RackResolver; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -184,149 +185,149 @@ public abstract class TaskAttemptImpl implements = new DiagnosticInformationUpdater(); private static final StateMachineFactory - + stateMachineFactory = new StateMachineFactory - - (TaskAttemptState.NEW) + + (TaskAttemptStateInternal.NEW) // Transitions from the NEW state. - .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED, + .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED, TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false)) - .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED, + .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED, TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true)) - .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED, + .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new KilledTransition()) - .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED, + .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) // Transitions from the UNASSIGNED state. - .addTransition(TaskAttemptState.UNASSIGNED, - TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, + .addTransition(TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, new ContainerAssignedTransition()) - .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED, + .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( - TaskAttemptState.KILLED, true)) - .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED, + TaskAttemptStateInternal.KILLED, true)) + .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( - TaskAttemptState.FAILED, true)) + TaskAttemptStateInternal.FAILED, true)) // Transitions from the ASSIGNED state. - .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING, + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_CONTAINER_LAUNCHED, new LaunchedContainerTransition()) - .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED, + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) - .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED, + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, - new DeallocateContainerTransition(TaskAttemptState.FAILED, false)) - .addTransition(TaskAttemptState.ASSIGNED, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false)) + .addTransition(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_CONTAINER_COMPLETED, CLEANUP_CONTAINER_TRANSITION) // ^ If RM kills the container due to expiry, preemption etc. - .addTransition(TaskAttemptState.ASSIGNED, - TaskAttemptState.KILL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) - .addTransition(TaskAttemptState.ASSIGNED, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) // Transitions from RUNNING state. - .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, + .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_UPDATE, new StatusUpdater()) - .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, + .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // If no commit is required, task directly goes to success - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) // If commit is required, task goes through commit pending state. - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.COMMIT_PENDING, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) // Failure handling while RUNNING - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) //for handling container exit without sending the done or fail msg - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_CONTAINER_COMPLETED, CLEANUP_CONTAINER_TRANSITION) // Timeout handling while RUNNING - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) // if container killed by AM shutting down - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.KILLED, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) // Kill handling - .addTransition(TaskAttemptState.RUNNING, - TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) // Transitions from COMMIT_PENDING state - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, new StatusUpdater()) - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.COMMIT_PENDING, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) // if container killed by AM shutting down - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.KILLED, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_CONTAINER_COMPLETED, CLEANUP_CONTAINER_TRANSITION) - .addTransition(TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) // Transitions from SUCCESS_CONTAINER_CLEANUP state // kill and cleanup the container - .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, + .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, new SucceededTransition()) .addTransition( - TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events - .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) // Transitions from FAIL_CONTAINER_CLEANUP state. - .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, - TaskAttemptState.FAIL_TASK_CLEANUP, + .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAIL_TASK_CLEANUP, TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) - .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events - .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, @@ -339,17 +340,17 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_TIMED_OUT)) // Transitions from KILL_CONTAINER_CLEANUP - .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP, - TaskAttemptState.KILL_TASK_CLEANUP, + .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.KILL_TASK_CLEANUP, TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) - .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP, - TaskAttemptState.KILL_CONTAINER_CLEANUP, + .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events .addTransition( - TaskAttemptState.KILL_CONTAINER_CLEANUP, - TaskAttemptState.KILL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, @@ -362,16 +363,16 @@ public abstract class TaskAttemptImpl implements // Transitions from FAIL_TASK_CLEANUP // run the task cleanup - .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, - TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE, + .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE, new FailedTransition()) - .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, - TaskAttemptState.FAIL_TASK_CLEANUP, + .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP, + TaskAttemptStateInternal.FAIL_TASK_CLEANUP, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events - .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, - TaskAttemptState.FAIL_TASK_CLEANUP, + .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP, + TaskAttemptStateInternal.FAIL_TASK_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, @@ -384,16 +385,16 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) // Transitions from KILL_TASK_CLEANUP - .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, - TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE, + .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP, + TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE, new KilledTransition()) - .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, - TaskAttemptState.KILL_TASK_CLEANUP, + .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP, + TaskAttemptStateInternal.KILL_TASK_CLEANUP, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events - .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, - TaskAttemptState.KILL_TASK_CLEANUP, + .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP, + TaskAttemptStateInternal.KILL_TASK_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_UPDATE, @@ -406,31 +407,31 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) // Transitions from SUCCEEDED - .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts - TaskAttemptState.FAILED, + .addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE, new TooManyFetchFailureTransition()) - .addTransition(TaskAttemptState.SUCCEEDED, - EnumSet.of(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED), + .addTransition(TaskAttemptStateInternal.SUCCEEDED, + EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED), TaskAttemptEventType.TA_KILL, new KilledAfterSuccessTransition()) .addTransition( - TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, + TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events for SUCCEEDED state - .addTransition(TaskAttemptState.SUCCEEDED, - TaskAttemptState.SUCCEEDED, + .addTransition(TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) // Transitions from FAILED state - .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, + .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events for FAILED state - .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, + .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, @@ -445,11 +446,11 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)) // Transitions from KILLED state - .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, + .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events for KILLED state - .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, + .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, @@ -466,7 +467,7 @@ public abstract class TaskAttemptImpl implements .installTopology(); private final StateMachine - + stateMachine; private ContainerId containerID; @@ -874,9 +875,9 @@ public abstract class TaskAttemptImpl implements readLock.lock(); try { // TODO: Use stateMachine level method? - return (getState() == TaskAttemptState.SUCCEEDED || - getState() == TaskAttemptState.FAILED || - getState() == TaskAttemptState.KILLED); + return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED || + getInternalState() == TaskAttemptStateInternal.FAILED || + getInternalState() == TaskAttemptStateInternal.KILLED); } finally { readLock.unlock(); } @@ -953,7 +954,7 @@ public abstract class TaskAttemptImpl implements public TaskAttemptState getState() { readLock.lock(); try { - return stateMachine.getCurrentState(); + return getExternalState(stateMachine.getCurrentState()); } finally { readLock.unlock(); } @@ -968,7 +969,7 @@ public abstract class TaskAttemptImpl implements } writeLock.lock(); try { - final TaskAttemptState oldState = getState(); + final TaskAttemptStateInternal oldState = getInternalState() ; try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { @@ -980,16 +981,58 @@ public abstract class TaskAttemptImpl implements eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(), JobEventType.INTERNAL_ERROR)); } - if (oldState != getState()) { + if (oldState != getInternalState()) { LOG.info(attemptId + " TaskAttempt Transitioned from " + oldState + " to " - + getState()); + + getInternalState()); } } finally { writeLock.unlock(); } } + @VisibleForTesting + public TaskAttemptStateInternal getInternalState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + private static TaskAttemptState getExternalState( + TaskAttemptStateInternal smState) { + switch (smState) { + case ASSIGNED: + case UNASSIGNED: + return TaskAttemptState.STARTING; + case COMMIT_PENDING: + return TaskAttemptState.COMMIT_PENDING; + case FAILED: + return TaskAttemptState.FAILED; + case KILLED: + return TaskAttemptState.KILLED; + // All CLEANUP states considered as RUNNING since events have not gone out + // to the Task yet. May be possible to consider them as a Finished state. + case FAIL_CONTAINER_CLEANUP: + case FAIL_TASK_CLEANUP: + case KILL_CONTAINER_CLEANUP: + case KILL_TASK_CLEANUP: + case SUCCESS_CONTAINER_CLEANUP: + case RUNNING: + return TaskAttemptState.RUNNING; + case NEW: + return TaskAttemptState.NEW; + case SUCCEEDED: + return TaskAttemptState.SUCCEEDED; + default: + throw new YarnException("Attempt to convert invalid " + + "stateMachineTaskAttemptState to externalTaskAttemptState: " + + smState); + } + } + //always called in write lock private void setFinishTime() { //set the finish time only if launch time is set @@ -1066,7 +1109,7 @@ public abstract class TaskAttemptImpl implements private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt, - TaskAttemptState attemptState) { + TaskAttemptStateInternal attemptState) { TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent( TypeConverter.fromYarn(taskAttempt.attemptId), @@ -1247,10 +1290,10 @@ public abstract class TaskAttemptImpl implements private static class DeallocateContainerTransition implements SingleArcTransition { - private final TaskAttemptState finalState; + private final TaskAttemptStateInternal finalState; private final boolean withdrawsContainerRequest; DeallocateContainerTransition - (TaskAttemptState finalState, boolean withdrawsContainerRequest) { + (TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) { this.finalState = finalState; this.withdrawsContainerRequest = withdrawsContainerRequest; } @@ -1288,10 +1331,10 @@ public abstract class TaskAttemptImpl implements TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, finalState); - if(finalState == TaskAttemptState.FAILED) { + if(finalState == TaskAttemptStateInternal.FAILED) { taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - } else if(finalState == TaskAttemptState.KILLED) { + } else if(finalState == TaskAttemptStateInternal.KILLED) { taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); } @@ -1405,7 +1448,7 @@ public abstract class TaskAttemptImpl implements JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, slotMillis); taskAttempt.eventHandler.handle(jce); - taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED); + taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_SUCCEEDED)); @@ -1428,10 +1471,10 @@ public abstract class TaskAttemptImpl implements .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptState.FAILED); + TaskAttemptStateInternal.FAILED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not + // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not // handling failed map/reduce events. }else { LOG.debug("Not generating HistoryFinish event since start event not " + @@ -1443,7 +1486,7 @@ public abstract class TaskAttemptImpl implements } @SuppressWarnings({ "unchecked" }) - private void logAttemptFinishedEvent(TaskAttemptState state) { + private void logAttemptFinishedEvent(TaskAttemptStateInternal state) { //Log finished events only if an attempt started. if (getLaunchTime() == 0) return; if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { @@ -1500,7 +1543,7 @@ public abstract class TaskAttemptImpl implements .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptState.FAILED); + TaskAttemptStateInternal.FAILED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { @@ -1513,11 +1556,11 @@ public abstract class TaskAttemptImpl implements } private static class KilledAfterSuccessTransition implements - MultipleArcTransition { + MultipleArcTransition { @SuppressWarnings("unchecked") @Override - public TaskAttemptState transition(TaskAttemptImpl taskAttempt, + public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) { // after a reduce task has succeeded, its outputs are in safe in HDFS. @@ -1530,7 +1573,7 @@ public abstract class TaskAttemptImpl implements // ignore this for reduce tasks LOG.info("Ignoring killed event for successful reduce task attempt" + taskAttempt.getID().toString()); - return TaskAttemptState.SUCCEEDED; + return TaskAttemptStateInternal.SUCCEEDED; } if(event instanceof TaskAttemptKillEvent) { TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event; @@ -1545,12 +1588,12 @@ public abstract class TaskAttemptImpl implements taskAttempt.eventHandler .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( - taskAttempt, TaskAttemptState.KILLED); + taskAttempt, TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId .getTaskId().getJobId(), tauce)); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); - return TaskAttemptState.KILLED; + return TaskAttemptStateInternal.KILLED; } } @@ -1568,14 +1611,14 @@ public abstract class TaskAttemptImpl implements .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptState.KILLED); + TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } -// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure. +// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index c717d96fce..830603b77e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; -import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; @@ -59,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; @@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Implementation of Task interface. */ @@ -127,62 +129,62 @@ public abstract class TaskImpl implements Task, EventHandler { KILL_TRANSITION = new KillTransition(); private static final StateMachineFactory - + stateMachineFactory - = new StateMachineFactory - (TaskState.NEW) + = new StateMachineFactory + (TaskStateInternal.NEW) // define the state machine of Task // Transitions from NEW state - .addTransition(TaskState.NEW, TaskState.SCHEDULED, + .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, TaskEventType.T_SCHEDULE, new InitialScheduleTransition()) - .addTransition(TaskState.NEW, TaskState.KILLED, + .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, TaskEventType.T_KILL, new KillNewTransition()) // Transitions from SCHEDULED state //when the first attempt is launched, the task state is set to RUNNING - .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, + .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING, TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition()) - .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, + .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT, TaskEventType.T_KILL, KILL_TRANSITION) - .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, + .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED, TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION) - .addTransition(TaskState.SCHEDULED, - EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), + .addTransition(TaskStateInternal.SCHEDULED, + EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED), TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition()) // Transitions from RUNNING state - .addTransition(TaskState.RUNNING, TaskState.RUNNING, + .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later - .addTransition(TaskState.RUNNING, TaskState.RUNNING, + .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ATTEMPT_COMMIT_PENDING, new AttemptCommitPendingTransition()) - .addTransition(TaskState.RUNNING, TaskState.RUNNING, + .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) - .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, + .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition()) - .addTransition(TaskState.RUNNING, TaskState.RUNNING, + .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION) - .addTransition(TaskState.RUNNING, - EnumSet.of(TaskState.RUNNING, TaskState.FAILED), + .addTransition(TaskStateInternal.RUNNING, + EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED), TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition()) - .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, + .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT, TaskEventType.T_KILL, KILL_TRANSITION) // Transitions from KILL_WAIT state - .addTransition(TaskState.KILL_WAIT, - EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED), + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), TaskEventType.T_ATTEMPT_KILLED, new KillWaitAttemptKilledTransition()) // Ignore-able transitions. .addTransition( - TaskState.KILL_WAIT, - TaskState.KILL_WAIT, + TaskStateInternal.KILL_WAIT, + TaskStateInternal.KILL_WAIT, EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_COMMIT_PENDING, @@ -191,32 +193,32 @@ public abstract class TaskImpl implements Task, EventHandler { TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from SUCCEEDED state - .addTransition(TaskState.SUCCEEDED, - EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED), + .addTransition(TaskStateInternal.SUCCEEDED, + EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED), TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition()) - .addTransition(TaskState.SUCCEEDED, - EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED), + .addTransition(TaskStateInternal.SUCCEEDED, + EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED), TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition()) // Ignore-able transitions. .addTransition( - TaskState.SUCCEEDED, TaskState.SUCCEEDED, + TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, TaskEventType.T_ATTEMPT_LAUNCHED)) // Transitions from FAILED state - .addTransition(TaskState.FAILED, TaskState.FAILED, + .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from KILLED state - .addTransition(TaskState.KILLED, TaskState.KILLED, + .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ADD_SPEC_ATTEMPT)) // create the topology tables .installTopology(); - private final StateMachine + private final StateMachine stateMachine; // By default, the next TaskAttempt number is zero. Changes during recovery @@ -247,7 +249,12 @@ public abstract class TaskImpl implements Task, EventHandler { @Override public TaskState getState() { - return stateMachine.getCurrentState(); + readLock.lock(); + try { + return getExternalState(getInternalState()); + } finally { + readLock.unlock(); + } } public TaskImpl(JobId jobId, TaskType taskType, int partition, @@ -356,9 +363,9 @@ public abstract class TaskImpl implements Task, EventHandler { readLock.lock(); try { // TODO: Use stateMachine level method? - return (getState() == TaskState.SUCCEEDED || - getState() == TaskState.FAILED || - getState() == TaskState.KILLED); + return (getInternalState() == TaskStateInternal.SUCCEEDED || + getInternalState() == TaskStateInternal.FAILED || + getInternalState() == TaskStateInternal.KILLED); } finally { readLock.unlock(); } @@ -433,6 +440,24 @@ public abstract class TaskImpl implements Task, EventHandler { } } + @VisibleForTesting + public TaskStateInternal getInternalState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + private static TaskState getExternalState(TaskStateInternal smState) { + if (smState == TaskStateInternal.KILL_WAIT) { + return TaskState.KILLED; + } else { + return TaskState.valueOf(smState.name()); + } + } + //this is always called in read/write lock private long getLaunchTime() { long taskLaunchTime = 0; @@ -484,8 +509,8 @@ public abstract class TaskImpl implements Task, EventHandler { return finishTime; } - private TaskState finished(TaskState finalState) { - if (getState() == TaskState.RUNNING) { + private TaskStateInternal finished(TaskStateInternal finalState) { + if (getInternalState() == TaskStateInternal.RUNNING) { metrics.endRunningTask(this); } return finalState; @@ -500,11 +525,7 @@ public abstract class TaskImpl implements Task, EventHandler { switch (at.getState()) { // ignore all failed task attempts - case FAIL_CONTAINER_CLEANUP: - case FAIL_TASK_CLEANUP: case FAILED: - case KILL_CONTAINER_CLEANUP: - case KILL_TASK_CLEANUP: case KILLED: continue; } @@ -605,7 +626,7 @@ public abstract class TaskImpl implements Task, EventHandler { } try { writeLock.lock(); - TaskState oldState = getState(); + TaskStateInternal oldState = getInternalState(); try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { @@ -613,9 +634,9 @@ public abstract class TaskImpl implements Task, EventHandler { + this.taskId, e); internalError(event.getType()); } - if (oldState != getState()) { + if (oldState != getInternalState()) { LOG.info(taskId + " Task Transitioned from " + oldState + " to " - + getState()); + + getInternalState()); } } finally { @@ -659,7 +680,7 @@ public abstract class TaskImpl implements Task, EventHandler { } } - private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) { + private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) { TaskFinishedEvent tfe = new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), TypeConverter.fromYarn(task.successfulAttempt), @@ -670,7 +691,7 @@ public abstract class TaskImpl implements Task, EventHandler { return tfe; } - private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List diag, TaskState taskState, TaskAttemptId taId) { + private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List diag, TaskStateInternal taskState, TaskAttemptId taId) { StringBuilder errorSb = new StringBuilder(); if (diag != null) { for (String d : diag) { @@ -775,7 +796,7 @@ public abstract class TaskImpl implements Task, EventHandler { // issue kill to all other attempts if (task.historyTaskStartGenerated) { TaskFinishedEvent tfe = createTaskFinishedEvent(task, - TaskState.SUCCEEDED); + TaskStateInternal.SUCCEEDED); task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe)); } @@ -791,7 +812,7 @@ public abstract class TaskImpl implements Task, EventHandler { TaskAttemptEventType.TA_KILL)); } } - task.finished(TaskState.SUCCEEDED); + task.finished(TaskStateInternal.SUCCEEDED); } } @@ -812,12 +833,12 @@ public abstract class TaskImpl implements Task, EventHandler { private static class KillWaitAttemptKilledTransition implements - MultipleArcTransition { + MultipleArcTransition { - protected TaskState finalState = TaskState.KILLED; + protected TaskStateInternal finalState = TaskStateInternal.KILLED; @Override - public TaskState transition(TaskImpl task, TaskEvent event) { + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.handleTaskAttemptCompletion( ((TaskTAttemptEvent) event).getTaskAttemptID(), TaskAttemptCompletionEventStatus.KILLED); @@ -835,18 +856,18 @@ public abstract class TaskImpl implements Task, EventHandler { } task.eventHandler.handle( - new JobTaskEvent(task.taskId, finalState)); + new JobTaskEvent(task.taskId, getExternalState(finalState))); return finalState; } - return task.getState(); + return task.getInternalState(); } } private static class AttemptFailedTransition implements - MultipleArcTransition { + MultipleArcTransition { @Override - public TaskState transition(TaskImpl task, TaskEvent event) { + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { @@ -878,7 +899,7 @@ public abstract class TaskImpl implements Task, EventHandler { if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), - TaskState.FAILED, taId); + TaskStateInternal.FAILED, taId); task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), taskFailedEvent)); } else { @@ -887,13 +908,13 @@ public abstract class TaskImpl implements Task, EventHandler { } task.eventHandler.handle( new JobTaskEvent(task.taskId, TaskState.FAILED)); - return task.finished(TaskState.FAILED); + return task.finished(TaskStateInternal.FAILED); } return getDefaultState(task); } - protected TaskState getDefaultState(Task task) { - return task.getState(); + protected TaskStateInternal getDefaultState(TaskImpl task) { + return task.getInternalState(); } } @@ -901,14 +922,14 @@ public abstract class TaskImpl implements Task, EventHandler { extends AttemptFailedTransition { @Override - public TaskState transition(TaskImpl task, TaskEvent event) { + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { if (event instanceof TaskTAttemptEvent) { TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (task.getState() == TaskState.SUCCEEDED && + if (task.getInternalState() == TaskStateInternal.SUCCEEDED && !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state - return TaskState.SUCCEEDED; + return TaskStateInternal.SUCCEEDED; } } @@ -933,25 +954,25 @@ public abstract class TaskImpl implements Task, EventHandler { } @Override - protected TaskState getDefaultState(Task task) { - return TaskState.SCHEDULED; + protected TaskStateInternal getDefaultState(TaskImpl task) { + return TaskStateInternal.SCHEDULED; } } private static class RetroactiveKilledTransition implements - MultipleArcTransition { + MultipleArcTransition { @Override - public TaskState transition(TaskImpl task, TaskEvent event) { + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { TaskAttemptId attemptId = null; if (event instanceof TaskTAttemptEvent) { TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; attemptId = castEvent.getTaskAttemptID(); - if (task.getState() == TaskState.SUCCEEDED && + if (task.getInternalState() == TaskStateInternal.SUCCEEDED && !attemptId.equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state - return TaskState.SUCCEEDED; + return TaskStateInternal.SUCCEEDED; } } @@ -977,7 +998,7 @@ public abstract class TaskImpl implements Task, EventHandler { // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. task.addAndScheduleAttempt(); - return TaskState.SCHEDULED; + return TaskStateInternal.SCHEDULED; } } @@ -988,7 +1009,7 @@ public abstract class TaskImpl implements Task, EventHandler { if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null, - TaskState.KILLED, null); // TODO Verify failedAttemptId is null + TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), taskFailedEvent)); }else { @@ -996,8 +1017,8 @@ public abstract class TaskImpl implements Task, EventHandler { " generated for task: " + task.getID()); } - task.eventHandler.handle( - new JobTaskEvent(task.taskId, TaskState.KILLED)); + task.eventHandler.handle(new JobTaskEvent(task.taskId, + getExternalState(TaskStateInternal.KILLED))); task.metrics.endWaitingTask(task); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index e587ba852e..63e92467ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -31,10 +31,11 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -163,13 +164,14 @@ public abstract class RMCommunicator extends AbstractService { protected void unregister() { try { FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; - if (job.getState() == JobState.SUCCEEDED) { + JobImpl jobImpl = (JobImpl)job; + if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) { finishState = FinalApplicationStatus.SUCCEEDED; - } else if (job.getState() == JobState.KILLED - || (job.getState() == JobState.RUNNING && isSignalled)) { + } else if (jobImpl.getInternalState() == JobStateInternal.KILLED + || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) { finishState = FinalApplicationStatus.KILLED; - } else if (job.getState() == JobState.FAILED - || job.getState() == JobState.ERROR) { + } else if (jobImpl.getInternalState() == JobStateInternal.FAILED + || jobImpl.getInternalState() == JobStateInternal.ERROR) { finishState = FinalApplicationStatus.FAILED; } StringBuffer sb = new StringBuffer(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java index ab7d23ef9d..25f9820b9a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java @@ -365,7 +365,7 @@ public class DefaultSpeculator extends AbstractService implements for (TaskAttempt taskAttempt : attempts.values()) { if (taskAttempt.getState() == TaskAttemptState.RUNNING - || taskAttempt.getState() == TaskAttemptState.ASSIGNED) { + || taskAttempt.getState() == TaskAttemptState.STARTING) { if (++numberRunningAttempts > 1) { return ALREADY_SPECULATING; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 5bf26fed0f..2fe3dcf154 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -50,8 +50,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; @@ -60,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; @@ -240,6 +243,24 @@ public class MRApp extends MRAppMaster { return job; } + public void waitForInternalState(TaskAttemptImpl attempt, + TaskAttemptStateInternal finalState) throws Exception { + int timeoutSecs = 0; + TaskAttemptReport report = attempt.getReport(); + TaskAttemptStateInternal iState = attempt.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("TaskAttempt Internal State is : " + iState + + " Waiting for Internal state : " + finalState + " progress : " + + report.getProgress()); + Thread.sleep(500); + report = attempt.getReport(); + iState = attempt.getInternalState(); + } + System.out.println("TaskAttempt Internal State is : " + iState); + Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)", + finalState, iState); + } + public void waitForState(TaskAttempt attempt, TaskAttemptState finalState) throws Exception { int timeoutSecs = 0; @@ -501,18 +522,18 @@ public class MRApp extends MRAppMaster { //override the init transition private final TestInitTransition initTransition = new TestInitTransition( maps, reduces); - StateMachineFactory localFactory - = stateMachineFactory.addTransition(JobState.NEW, - EnumSet.of(JobState.INITED, JobState.FAILED), + StateMachineFactory localFactory + = stateMachineFactory.addTransition(JobStateInternal.NEW, + EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), JobEventType.JOB_INIT, // This is abusive. initTransition); - private final StateMachine + private final StateMachine localStateMachine; @Override - protected StateMachine getStateMachine() { + protected StateMachine getStateMachine() { return localStateMachine; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 60ec171c5f..3af570823a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -36,8 +36,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; @@ -190,7 +192,8 @@ public class TestFail { Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts .size()); TaskAttempt attempt = attempts.values().iterator().next(); - app.waitForState(attempt, TaskAttemptState.ASSIGNED); + app.waitForInternalState((TaskAttemptImpl) attempt, + TaskAttemptStateInternal.ASSIGNED); app.getDispatcher().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_CONTAINER_COMPLETED)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 4c8f78b97b..cd8e1c5de3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; @@ -56,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; @@ -411,8 +412,8 @@ public class TestRMContainerAllocator { // Wait till all map-attempts request for containers for (Task t : job.getTasks().values()) { if (t.getType() == TaskType.MAP) { - mrApp.waitForState(t.getAttempts().values().iterator().next(), - TaskAttemptState.UNASSIGNED); + mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values() + .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); } } amDispatcher.await(); @@ -562,8 +563,8 @@ public class TestRMContainerAllocator { amDispatcher.await(); // Wait till all map-attempts request for containers for (Task t : job.getTasks().values()) { - mrApp.waitForState(t.getAttempts().values().iterator().next(), - TaskAttemptState.UNASSIGNED); + mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values() + .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); } amDispatcher.await(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index dd1691deac..7a77a6b208 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -42,8 +42,8 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -77,11 +77,11 @@ public class TestJobImpl { tasks.put(mockTask.getID(), mockTask); mockJob.tasks = tasks; - when(mockJob.getState()).thenReturn(JobState.ERROR); + when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR); JobEvent mockJobEvent = mock(JobEvent.class); - JobState state = trans.transition(mockJob, mockJobEvent); + JobStateInternal state = trans.transition(mockJob, mockJobEvent); Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition", - JobState.ERROR, state); + JobStateInternal.ERROR, state); } @Test @@ -96,9 +96,12 @@ public class TestJobImpl { when(mockJob.getCommitter()).thenReturn(mockCommitter); when(mockJob.getEventHandler()).thenReturn(mockEventHandler); when(mockJob.getJobContext()).thenReturn(mockJobContext); - when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED); - when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED); - when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED); + when(mockJob.finished(JobStateInternal.KILLED)).thenReturn( + JobStateInternal.KILLED); + when(mockJob.finished(JobStateInternal.FAILED)).thenReturn( + JobStateInternal.FAILED); + when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn( + JobStateInternal.SUCCEEDED); try { doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class)); @@ -106,11 +109,11 @@ public class TestJobImpl { // commitJob stubbed out, so this can't happen } doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class)); - JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob); + JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob); Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " + "for successful job", jobState); Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", - JobState.FAILED, jobState); + JobStateInternal.FAILED, jobState); verify(mockJob).abortJob( eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } @@ -129,7 +132,8 @@ public class TestJobImpl { when(mockJob.getJobContext()).thenReturn(mockJobContext); doNothing().when(mockJob).setFinishTime(); doNothing().when(mockJob).logJobHistoryFinishedEvent(); - when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED); + when(mockJob.finished(any(JobStateInternal.class))).thenReturn( + JobStateInternal.SUCCEEDED); try { doNothing().when(mockCommitter).commitJob(any(JobContext.class)); @@ -141,7 +145,7 @@ public class TestJobImpl { "for successful job", JobImpl.checkJobCompleteSuccess(mockJob)); Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", - JobState.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob)); + JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob)); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 765e56f4db..da21dd79db 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; @@ -48,13 +47,13 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -338,7 +337,7 @@ public class TestTaskImpl { * {@link TaskState#KILL_WAIT} */ private void assertTaskKillWaitState() { - assertEquals(TaskState.KILL_WAIT, mockTask.getState()); + assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState()); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 9ae938a808..e1bab01756 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManager; @@ -260,7 +262,8 @@ public class TestContainerLauncher { attempts.size()); TaskAttempt attempt = attempts.values().iterator().next(); - app.waitForState(attempt, TaskAttemptState.ASSIGNED); + app.waitForInternalState((TaskAttemptImpl) attempt, + TaskAttemptStateInternal.ASSIGNED); app.waitForState(job, JobState.FAILED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index 173807a618..d319a7f883 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -128,14 +128,26 @@ public class TypeConverter { return taskId; } - public static TaskAttemptState toYarn(org.apache.hadoop.mapred.TaskStatus.State state) { - if (state == org.apache.hadoop.mapred.TaskStatus.State.KILLED_UNCLEAN) { - return TaskAttemptState.KILLED; - } - if (state == org.apache.hadoop.mapred.TaskStatus.State.FAILED_UNCLEAN) { + public static TaskAttemptState toYarn( + org.apache.hadoop.mapred.TaskStatus.State state) { + switch (state) { + case COMMIT_PENDING: + return TaskAttemptState.COMMIT_PENDING; + case FAILED: + case FAILED_UNCLEAN: return TaskAttemptState.FAILED; + case KILLED: + case KILLED_UNCLEAN: + return TaskAttemptState.KILLED; + case RUNNING: + return TaskAttemptState.RUNNING; + case SUCCEEDED: + return TaskAttemptState.SUCCEEDED; + case UNASSIGNED: + return TaskAttemptState.STARTING; + default: + throw new YarnException("Unrecognized State: " + state); } - return TaskAttemptState.valueOf(state.toString()); } public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) { @@ -309,7 +321,6 @@ public class TypeConverter { return org.apache.hadoop.mapred.JobStatus.PREP; case RUNNING: return org.apache.hadoop.mapred.JobStatus.RUNNING; - case KILL_WAIT: case KILLED: return org.apache.hadoop.mapred.JobStatus.KILLED; case SUCCEEDED: @@ -329,7 +340,6 @@ public class TypeConverter { return org.apache.hadoop.mapred.TIPStatus.PENDING; case RUNNING: return org.apache.hadoop.mapred.TIPStatus.RUNNING; - case KILL_WAIT: case KILLED: return org.apache.hadoop.mapred.TIPStatus.KILLED; case SUCCEEDED: diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java index a8151c9498..e80013ebce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java @@ -24,7 +24,6 @@ public enum JobState { RUNNING, SUCCEEDED, FAILED, - KILL_WAIT, KILLED, ERROR } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java index 00378dc9df..3c361cce6a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java @@ -20,16 +20,10 @@ package org.apache.hadoop.mapreduce.v2.api.records; public enum TaskAttemptState { NEW, - UNASSIGNED, - ASSIGNED, + STARTING, RUNNING, - COMMIT_PENDING, - SUCCESS_CONTAINER_CLEANUP, - SUCCEEDED, - FAIL_CONTAINER_CLEANUP, - FAIL_TASK_CLEANUP, - FAILED, - KILL_CONTAINER_CLEANUP, - KILL_TASK_CLEANUP, + COMMIT_PENDING, + SUCCEEDED, + FAILED, KILLED } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java index a7fdbc86de..00cd6e2861 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java @@ -19,5 +19,5 @@ package org.apache.hadoop.mapreduce.v2.api.records; public enum TaskState { - NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED + NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILLED } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 055b07996f..169ba4b4c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -49,8 +49,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.yarn.ContainerLogAppender; import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -100,15 +100,10 @@ public class MRApps extends Apps { public static enum TaskAttemptStateUI { NEW( new TaskAttemptState[] { TaskAttemptState.NEW, - TaskAttemptState.UNASSIGNED, TaskAttemptState.ASSIGNED }), + TaskAttemptState.STARTING }), RUNNING( new TaskAttemptState[] { TaskAttemptState.RUNNING, - TaskAttemptState.COMMIT_PENDING, - TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptState.FAIL_CONTAINER_CLEANUP, - TaskAttemptState.FAIL_TASK_CLEANUP, - TaskAttemptState.KILL_CONTAINER_CLEANUP, - TaskAttemptState.KILL_TASK_CLEANUP }), + TaskAttemptState.COMMIT_PENDING }), SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}), FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}), KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED}); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto index 95345ac816..c0a4e92c5b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto @@ -50,8 +50,7 @@ enum TaskStateProto { TS_RUNNING = 3; TS_SUCCEEDED = 4; TS_FAILED = 5; - TS_KILL_WAIT = 6; - TS_KILLED = 7; + TS_KILLED = 6; } enum PhaseProto { @@ -93,18 +92,12 @@ message TaskReportProto { enum TaskAttemptStateProto { TA_NEW = 1; - TA_UNASSIGNED = 2; - TA_ASSIGNED = 3; - TA_RUNNING = 4; - TA_COMMIT_PENDING = 5; - TA_SUCCESS_CONTAINER_CLEANUP = 6; - TA_SUCCEEDED = 7; - TA_FAIL_CONTAINER_CLEANUP = 8; - TA_FAIL_TASK_CLEANUP = 9; - TA_FAILED = 10; - TA_KILL_CONTAINER_CLEANUP = 11; - TA_KILL_TASK_CLEANUP = 12; - TA_KILLED = 13; + TA_STARTING = 2; + TA_RUNNING = 3; + TA_COMMIT_PENDING = 4; + TA_SUCCEEDED = 5; + TA_FAILED = 6; + TA_KILLED = 7; } message TaskAttemptReportProto { @@ -131,9 +124,8 @@ enum JobStateProto { J_RUNNING = 3; J_SUCCEEDED = 4; J_FAILED = 5; - J_KILL_WAIT = 6; - J_KILLED = 7; - J_ERROR = 8; + J_KILLED = 6; + J_ERROR = 7; } message JobReportProto {