MAPREDUCE-4102. job counters not available in Jobhistory webui for killed jobs (Bhallamudi Venkata Siva Kamesh via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1339174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9142dcac9e
commit
7e3d016845
@ -500,6 +500,9 @@ Release 0.23.3 - UNRELEASED
|
|||||||
|
|
||||||
MAPREDUCE-4238. mavenize data_join. (tgraves)
|
MAPREDUCE-4238. mavenize data_join. (tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-4102. job counters not available in Jobhistory webui for
|
||||||
|
killed jobs (Bhallamudi Venkata Siva Kamesh via tgraves)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -69,7 +69,7 @@ public class CountersBlock extends HtmlBlock {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(total == null || total.getGroupNames() == null) {
|
if(total == null || total.getGroupNames() == null || total.countCounters() == 0) {
|
||||||
String type = $(TASK_ID);
|
String type = $(TASK_ID);
|
||||||
if(type == null || type.isEmpty()) {
|
if(type == null || type.isEmpty()) {
|
||||||
type = $(JOB_ID, "the job");
|
type = $(JOB_ID, "the job");
|
||||||
@ -180,14 +180,25 @@ private void getCounters(AppContext ctx) {
|
|||||||
// Get all types of counters
|
// Get all types of counters
|
||||||
Map<TaskId, Task> tasks = job.getTasks();
|
Map<TaskId, Task> tasks = job.getTasks();
|
||||||
total = job.getAllCounters();
|
total = job.getAllCounters();
|
||||||
|
boolean needTotalCounters = false;
|
||||||
|
if (total == null) {
|
||||||
|
total = new Counters();
|
||||||
|
needTotalCounters = true;
|
||||||
|
}
|
||||||
map = new Counters();
|
map = new Counters();
|
||||||
reduce = new Counters();
|
reduce = new Counters();
|
||||||
for (Task t : tasks.values()) {
|
for (Task t : tasks.values()) {
|
||||||
Counters counters = t.getCounters();
|
Counters counters = t.getCounters();
|
||||||
|
if (counters == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
switch (t.getType()) {
|
switch (t.getType()) {
|
||||||
case MAP: map.incrAllCounters(counters); break;
|
case MAP: map.incrAllCounters(counters); break;
|
||||||
case REDUCE: reduce.incrAllCounters(counters); break;
|
case REDUCE: reduce.incrAllCounters(counters); break;
|
||||||
}
|
}
|
||||||
|
if (needTotalCounters) {
|
||||||
|
total.incrAllCounters(counters);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +81,9 @@ private void getCounters(AppContext ctx, Job job) {
|
|||||||
Map<TaskId, Task> tasks = job.getTasks();
|
Map<TaskId, Task> tasks = job.getTasks();
|
||||||
for (Task t : tasks.values()) {
|
for (Task t : tasks.values()) {
|
||||||
Counters counters = t.getCounters();
|
Counters counters = t.getCounters();
|
||||||
|
if (counters == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
total.incrAllCounters(counters);
|
total.incrAllCounters(counters);
|
||||||
switch (t.getType()) {
|
switch (t.getType()) {
|
||||||
case MAP:
|
case MAP:
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -131,6 +132,17 @@ public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
|
|||||||
}
|
}
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
|
||||||
|
int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks) {
|
||||||
|
Map<JobId, Job> map = Maps.newHashMap();
|
||||||
|
for (int j = 0; j < numJobsPerApp; ++j) {
|
||||||
|
Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask, null,
|
||||||
|
hasFailedTasks);
|
||||||
|
map.put(job.getID(), job);
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
public static JobId newJobID(ApplicationId appID, int i) {
|
public static JobId newJobID(ApplicationId appID, int i) {
|
||||||
JobId id = Records.newRecord(JobId.class);
|
JobId id = Records.newRecord(JobId.class);
|
||||||
@ -316,16 +328,16 @@ public String getNodeRackName() {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<TaskId, Task> newTasks(JobId jid, int n, int m) {
|
public static Map<TaskId, Task> newTasks(JobId jid, int n, int m, boolean hasFailedTasks) {
|
||||||
Map<TaskId, Task> map = Maps.newHashMap();
|
Map<TaskId, Task> map = Maps.newHashMap();
|
||||||
for (int i = 0; i < n; ++i) {
|
for (int i = 0; i < n; ++i) {
|
||||||
Task task = newTask(jid, i, m);
|
Task task = newTask(jid, i, m, hasFailedTasks);
|
||||||
map.put(task.getID(), task);
|
map.put(task.getID(), task);
|
||||||
}
|
}
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Task newTask(JobId jid, int i, int m) {
|
public static Task newTask(JobId jid, int i, int m, final boolean hasFailedTasks) {
|
||||||
final TaskId tid = Records.newRecord(TaskId.class);
|
final TaskId tid = Records.newRecord(TaskId.class);
|
||||||
tid.setJobId(jid);
|
tid.setJobId(jid);
|
||||||
tid.setId(i);
|
tid.setId(i);
|
||||||
@ -345,6 +357,9 @@ public TaskReport getReport() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Counters getCounters() {
|
public Counters getCounters() {
|
||||||
|
if (hasFailedTasks) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
return new Counters(
|
return new Counters(
|
||||||
TypeConverter.fromYarn(report.getCounters()));
|
TypeConverter.fromYarn(report.getCounters()));
|
||||||
}
|
}
|
||||||
@ -394,8 +409,14 @@ public TaskState getState() {
|
|||||||
|
|
||||||
public static Counters getCounters(
|
public static Counters getCounters(
|
||||||
Collection<Task> tasks) {
|
Collection<Task> tasks) {
|
||||||
|
List<Task> completedTasks = new ArrayList<Task>();
|
||||||
|
for (Task task : tasks) {
|
||||||
|
if (task.getCounters() != null) {
|
||||||
|
completedTasks.add(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
Counters counters = new Counters();
|
Counters counters = new Counters();
|
||||||
return JobImpl.incrTaskCounters(counters, tasks);
|
return JobImpl.incrTaskCounters(counters, completedTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class TaskCount {
|
static class TaskCount {
|
||||||
@ -434,10 +455,15 @@ public static Job newJob(ApplicationId appID, int i, int n, int m) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile) {
|
public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile) {
|
||||||
|
return newJob(appID, i, n, m, confFile, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Job newJob(ApplicationId appID, int i, int n, int m,
|
||||||
|
Path confFile, boolean hasFailedTasks) {
|
||||||
final JobId id = newJobID(appID, i);
|
final JobId id = newJobID(appID, i);
|
||||||
final String name = newJobName();
|
final String name = newJobName();
|
||||||
final JobReport report = newJobReport(id);
|
final JobReport report = newJobReport(id);
|
||||||
final Map<TaskId, Task> tasks = newTasks(id, n, m);
|
final Map<TaskId, Task> tasks = newTasks(id, n, m, hasFailedTasks);
|
||||||
final TaskCount taskCount = getTaskCount(tasks.values());
|
final TaskCount taskCount = getTaskCount(tasks.values());
|
||||||
final Counters counters = getCounters(tasks
|
final Counters counters = getCounters(tasks
|
||||||
.values());
|
.values());
|
||||||
|
@ -43,6 +43,14 @@ public static JobsPair newHistoryJobs(ApplicationId appID, int numJobsPerApp,
|
|||||||
numAttemptsPerTask);
|
numAttemptsPerTask);
|
||||||
return split(mocked);
|
return split(mocked);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static JobsPair newHistoryJobs(ApplicationId appID, int numJobsPerApp,
|
||||||
|
int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks)
|
||||||
|
throws IOException {
|
||||||
|
Map<JobId, Job> mocked = newJobs(appID, numJobsPerApp, numTasksPerJob,
|
||||||
|
numAttemptsPerTask, hasFailedTasks);
|
||||||
|
return split(mocked);
|
||||||
|
}
|
||||||
|
|
||||||
private static JobsPair split(Map<JobId, Job> mocked) throws IOException {
|
private static JobsPair split(Map<JobId, Job> mocked) throws IOException {
|
||||||
JobsPair ret = new JobsPair();
|
JobsPair ret = new JobsPair();
|
||||||
|
@ -63,10 +63,16 @@ static class TestAppContext implements AppContext {
|
|||||||
final Map<JobId, Job> jobs;
|
final Map<JobId, Job> jobs;
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
|
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts,
|
||||||
|
boolean hasFailedTasks) {
|
||||||
appID = MockJobs.newAppID(appid);
|
appID = MockJobs.newAppID(appid);
|
||||||
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
|
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
|
||||||
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
|
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts,
|
||||||
|
hasFailedTasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
|
||||||
|
this(appid, numJobs, numTasks, numAttempts, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
TestAppContext() {
|
TestAppContext() {
|
||||||
@ -198,6 +204,14 @@ public void testTaskView() {
|
|||||||
appContext, params);
|
appContext, params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testJobCounterViewForKilledJob() {
|
||||||
|
LOG.info("JobCounterViewForKilledJob");
|
||||||
|
AppContext appContext = new TestAppContext(0, 1, 1, 1, true);
|
||||||
|
Map<String, String> params = TestAMWebApp.getJobParams(appContext);
|
||||||
|
WebAppTests.testPage(HsCountersPage.class, AppContext.class,
|
||||||
|
appContext, params);
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void testSingleCounterView() {
|
@Test public void testSingleCounterView() {
|
||||||
LOG.info("HsSingleCounterPage");
|
LOG.info("HsSingleCounterPage");
|
||||||
WebAppTests.testPage(HsSingleCounterPage.class, AppContext.class,
|
WebAppTests.testPage(HsSingleCounterPage.class, AppContext.class,
|
||||||
|
@ -101,13 +101,15 @@ static class TestAppContext implements HistoryContext {
|
|||||||
final Map<JobId, Job> partialJobs;
|
final Map<JobId, Job> partialJobs;
|
||||||
final Map<JobId, Job> fullJobs;
|
final Map<JobId, Job> fullJobs;
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
|
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts,
|
||||||
|
boolean hasFailedTasks) {
|
||||||
appID = MockJobs.newAppID(appid);
|
appID = MockJobs.newAppID(appid);
|
||||||
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
|
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
|
||||||
JobsPair jobs;
|
JobsPair jobs;
|
||||||
try {
|
try {
|
||||||
jobs = MockHistoryJobs.newHistoryJobs(appID, numJobs, numTasks, numAttempts);
|
jobs = MockHistoryJobs.newHistoryJobs(appID, numJobs, numTasks,
|
||||||
|
numAttempts, hasFailedTasks);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnException(e);
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
@ -115,6 +117,10 @@ static class TestAppContext implements HistoryContext {
|
|||||||
fullJobs = jobs.full;
|
fullJobs = jobs.full;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
|
||||||
|
this(appid, numJobs, numTasks, numAttempts, false);
|
||||||
|
}
|
||||||
|
|
||||||
TestAppContext() {
|
TestAppContext() {
|
||||||
this(0, 1, 2, 1);
|
this(0, 1, 2, 1);
|
||||||
}
|
}
|
||||||
@ -628,6 +634,46 @@ public void testJobCountersSlash() throws JSONException, Exception {
|
|||||||
verifyHsJobCounters(info, jobsMap.get(id));
|
verifyHsJobCounters(info, jobsMap.get(id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobCountersForKilledJob() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
appContext = new TestAppContext(0, 1, 1, 1, true);
|
||||||
|
injector = Guice.createInjector(new ServletModule() {
|
||||||
|
@Override
|
||||||
|
protected void configureServlets() {
|
||||||
|
|
||||||
|
webApp = mock(HsWebApp.class);
|
||||||
|
when(webApp.name()).thenReturn("hsmockwebapp");
|
||||||
|
|
||||||
|
bind(JAXBContextResolver.class);
|
||||||
|
bind(HsWebServices.class);
|
||||||
|
bind(GenericExceptionHandler.class);
|
||||||
|
bind(WebApp.class).toInstance(webApp);
|
||||||
|
bind(AppContext.class).toInstance(appContext);
|
||||||
|
bind(HistoryContext.class).toInstance(appContext);
|
||||||
|
bind(Configuration.class).toInstance(conf);
|
||||||
|
|
||||||
|
serve("/*").with(GuiceContainer.class);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Map<JobId, Job> jobsMap = appContext.getAllJobs();
|
||||||
|
for (JobId id : jobsMap.keySet()) {
|
||||||
|
String jobId = MRApps.toString(id);
|
||||||
|
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("history")
|
||||||
|
.path("mapreduce").path("jobs").path(jobId).path("counters/")
|
||||||
|
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
|
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||||
|
JSONObject json = response.getEntity(JSONObject.class);
|
||||||
|
assertEquals("incorrect number of elements", 1, json.length());
|
||||||
|
JSONObject info = json.getJSONObject("jobCounters");
|
||||||
|
WebServicesTestUtils.checkStringMatch("id", MRApps.toString(id),
|
||||||
|
info.getString("id"));
|
||||||
|
assertTrue("Job shouldn't contain any counters", info.length() == 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJobCountersDefault() throws JSONException, Exception {
|
public void testJobCountersDefault() throws JSONException, Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user