MAPREDUCE-7140. Refactoring TaskAttemptInfo to separate Map and Reduce tasks. Contributed by Oleksandr Shevchenko

This commit is contained in:
Jason Lowe 2018-09-14 15:04:18 -05:00
parent 5470de420b
commit 488806baca
11 changed files with 77 additions and 37 deletions

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptState;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
@ -396,9 +397,9 @@ public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
if (task.getType() == TaskType.REDUCE) {
attempts.add(new ReduceTaskAttemptInfo(ta, task.getType()));
attempts.add(new ReduceTaskAttemptInfo(ta));
} else {
attempts.add(new TaskAttemptInfo(ta, task.getType(), true));
attempts.add(new MapTaskAttemptInfo(ta, true));
}
}
}
@ -419,9 +420,9 @@ public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
Task task = getTaskFromTaskIdString(tid, job);
TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
if (task.getType() == TaskType.REDUCE) {
return new ReduceTaskAttemptInfo(ta, task.getType());
return new ReduceTaskAttemptInfo(ta);
} else {
return new TaskAttemptInfo(ta, task.getType(), true);
return new MapTaskAttemptInfo(ta, true);
}
}

View File

@ -42,8 +42,8 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskCounterGroupInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskCounterInfo;
@ -62,9 +62,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
AppInfo.class, CounterInfo.class, JobTaskAttemptCounterInfo.class,
JobTaskCounterInfo.class, TaskCounterGroupInfo.class, ConfInfo.class,
JobCounterInfo.class, TaskCounterInfo.class, CounterGroupInfo.class,
JobInfo.class, JobsInfo.class, ReduceTaskAttemptInfo.class,
TaskAttemptInfo.class, TaskInfo.class, TasksInfo.class,
TaskAttemptsInfo.class, ConfEntryInfo.class, RemoteExceptionData.class};
JobInfo.class, JobsInfo.class, MapTaskAttemptInfo.class, ReduceTaskAttemptInfo.class,
TaskInfo.class, TasksInfo.class, TaskAttemptsInfo.class, ConfEntryInfo.class, RemoteExceptionData.class};
// these dao classes need root unwrapping
private final Class[] rootUnwrappedTypes = {JobTaskAttemptState.class};

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.util.StringUtils;
@ -124,7 +125,7 @@ protected void render(Block html) {
StringBuilder attemptsTableData = new StringBuilder("[\n");
for (TaskAttempt attempt : getTaskAttempts()) {
TaskAttemptInfo ta = new TaskAttemptInfo(attempt, true);
TaskAttemptInfo ta = new MapTaskAttemptInfo(attempt, true);
String progress = StringUtils.format("%.2f", ta.getProgress());
String nodeHttpAddr = ta.getNode();

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.webapp.dao;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "taskAttempt")
public class MapTaskAttemptInfo extends TaskAttemptInfo {
public MapTaskAttemptInfo() {
}
public MapTaskAttemptInfo(TaskAttempt ta) {
this(ta, false);
}
public MapTaskAttemptInfo(TaskAttempt ta, Boolean isRunning) {
super(ta, TaskType.MAP, isRunning);
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -27,7 +25,6 @@
import org.apache.hadoop.yarn.util.Times;
@XmlRootElement(name = "taskAttempt")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
protected long shuffleFinishTime;
@ -39,8 +36,12 @@ public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
public ReduceTaskAttemptInfo() {
}
public ReduceTaskAttemptInfo(TaskAttempt ta, TaskType type) {
super(ta, type, false);
public ReduceTaskAttemptInfo(TaskAttempt ta) {
this(ta, false);
}
public ReduceTaskAttemptInfo(TaskAttempt ta, Boolean isRunning) {
super(ta, TaskType.REDUCE, isRunning);
this.shuffleFinishTime = ta.getShuffleFinishTime();
this.mergeFinishTime = ta.getSortFinishTime();

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.mapreduce.v2.app.webapp.dao;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@ -31,13 +29,12 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
@XmlRootElement(name = "taskAttempt")
@XmlSeeAlso({ ReduceTaskAttemptInfo.class })
@XmlSeeAlso({MapTaskAttemptInfo.class, ReduceTaskAttemptInfo.class})
@XmlAccessorType(XmlAccessType.FIELD)
public class TaskAttemptInfo {
public abstract class TaskAttemptInfo {
protected long startTime;
protected long finishTime;
@ -58,10 +55,6 @@ public class TaskAttemptInfo {
public TaskAttemptInfo() {
}
public TaskAttemptInfo(TaskAttempt ta, Boolean isRunning) {
this(ta, TaskType.MAP, isRunning);
}
public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
final TaskAttemptReport report = ta.getReport();
this.type = type.toString();
@ -133,4 +126,7 @@ public String getNote() {
return this.diagnostics;
}
public String getType() {
return type;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.webapp.dao;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
@ -25,21 +26,20 @@
@XmlRootElement(name = "taskAttempts")
public class TaskAttemptsInfo {
protected ArrayList<TaskAttemptInfo> taskAttempt = new ArrayList<TaskAttemptInfo>();
protected List<TaskAttemptInfo> taskAttempts = new ArrayList<>();
public TaskAttemptsInfo() {
} // JAXB needs this
public void add(TaskAttemptInfo taskattemptInfo) {
taskAttempt.add(taskattemptInfo);
taskAttempts.add(taskattemptInfo);
}
// XmlElementRef annotation should be used to identify the exact type of a list element
// otherwise metadata will be added to XML attributes,
// it can lead to incorrect JSON marshaling
@XmlElementRef
public ArrayList<TaskAttemptInfo> getTaskAttempts() {
return taskAttempt;
public List<TaskAttemptInfo> getTaskAttempts() {
return taskAttempts;
}
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
@ -115,7 +116,7 @@ protected void render(Block html) {
StringBuilder attemptsTableData = new StringBuilder("[\n");
for (TaskAttempt attempt : getTaskAttempts()) {
final TaskAttemptInfo ta = new TaskAttemptInfo(attempt, false);
final TaskAttemptInfo ta = new MapTaskAttemptInfo(attempt, false);
String taid = ta.getId();
String nodeHttpAddr = ta.getNode();

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
@ -129,7 +130,7 @@ public class HsTasksBlock extends HtmlBlock {
if(successful != null) {
TaskAttemptInfo ta;
if(type == TaskType.REDUCE) {
ReduceTaskAttemptInfo rta = new ReduceTaskAttemptInfo(successful, type);
ReduceTaskAttemptInfo rta = new ReduceTaskAttemptInfo(successful);
shuffleFinishTime = rta.getShuffleFinishTime();
sortFinishTime = rta.getMergeFinishTime();
elapsedShuffleTime = rta.getElapsedShuffleTime();
@ -137,7 +138,7 @@ public class HsTasksBlock extends HtmlBlock {
elapsedReduceTime = rta.getElapsedReduceTime();
ta = rta;
} else {
ta = new TaskAttemptInfo(successful, type, false);
ta = new MapTaskAttemptInfo(successful, false);
}
attemptStartTime = ta.getStartTime();
attemptFinishTime = ta.getFinishTime();

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
@ -361,9 +362,9 @@ public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
if (task.getType() == TaskType.REDUCE) {
attempts.add(new ReduceTaskAttemptInfo(ta, task.getType()));
attempts.add(new ReduceTaskAttemptInfo(ta));
} else {
attempts.add(new TaskAttemptInfo(ta, task.getType(), false));
attempts.add(new MapTaskAttemptInfo(ta, false));
}
}
}
@ -385,9 +386,9 @@ public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
if (task.getType() == TaskType.REDUCE) {
return new ReduceTaskAttemptInfo(ta, task.getType());
return new ReduceTaskAttemptInfo(ta);
} else {
return new TaskAttemptInfo(ta, task.getType(), false);
return new MapTaskAttemptInfo(ta, false);
}
}

View File

@ -36,8 +36,8 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.MapTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskCounterGroupInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskCounterInfo;
@ -63,7 +63,7 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
JobsInfo.class, TaskInfo.class, TasksInfo.class, TaskAttemptsInfo.class,
ConfInfo.class, CounterInfo.class, JobTaskCounterInfo.class,
JobTaskAttemptCounterInfo.class, TaskCounterInfo.class,
JobCounterInfo.class, ReduceTaskAttemptInfo.class, TaskAttemptInfo.class,
JobCounterInfo.class, MapTaskAttemptInfo.class, ReduceTaskAttemptInfo.class,
TaskAttemptsInfo.class, CounterGroupInfo.class,
TaskCounterGroupInfo.class, AMAttemptInfo.class, AMAttemptsInfo.class,
RemoteExceptionData.class };