MAPREDUCE-6640. mapred job -history command should be able to take Job ID (rkanter)
This commit is contained in:
parent
def754ec06
commit
4343a4cf77
@ -319,6 +319,9 @@ Release 2.9.0 - UNRELEASED
|
||||
MAPREDUCE-6627. Add machine-readable output to mapred job -history
|
||||
command (rkanter)
|
||||
|
||||
MAPREDUCE-6640. mapred job -history command should be able to take
|
||||
Job ID (rkanter)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -327,7 +327,8 @@ public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) {
|
||||
jobreport.getMapProgress(), jobreport.getReduceProgress(),
|
||||
jobreport.getCleanupProgress(), fromYarn(jobreport.getJobState()),
|
||||
jobPriority, jobreport.getUser(), jobreport.getJobName(),
|
||||
jobreport.getJobFile(), trackingUrl, jobreport.isUber());
|
||||
jobreport.getJobFile(), trackingUrl, jobreport.isUber(),
|
||||
jobreport.getHistoryFile());
|
||||
jobStatus.setStartTime(jobreport.getStartTime());
|
||||
jobStatus.setFinishTime(jobreport.getFinishTime());
|
||||
jobStatus.setFailureInfo(jobreport.getDiagnostics());
|
||||
|
@ -40,6 +40,7 @@ public interface JobReport {
|
||||
public abstract List<AMInfo> getAMInfos();
|
||||
public abstract boolean isUber();
|
||||
public abstract Priority getJobPriority();
|
||||
public abstract String getHistoryFile();
|
||||
|
||||
public abstract void setJobId(JobId jobId);
|
||||
public abstract void setJobState(JobState jobState);
|
||||
@ -58,4 +59,5 @@ public interface JobReport {
|
||||
public abstract void setAMInfos(List<AMInfo> amInfos);
|
||||
public abstract void setIsUber(boolean isUber);
|
||||
public abstract void setJobPriority(Priority priority);
|
||||
public abstract void setHistoryFile(String historyFile);
|
||||
}
|
||||
|
@ -380,4 +380,16 @@ public synchronized void setJobPriority(Priority priority) {
|
||||
}
|
||||
this.jobPriority = priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getHistoryFile() {
|
||||
JobReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getHistoryFile();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setHistoryFile(String historyFile) {
|
||||
maybeInitBuilder();
|
||||
builder.setHistoryFile(historyFile);
|
||||
}
|
||||
}
|
||||
|
@ -147,6 +147,7 @@ message JobReportProto {
|
||||
optional int64 submit_time = 15;
|
||||
optional bool is_uber = 16 [default = false];
|
||||
optional hadoop.yarn.PriorityProto jobPriority = 17;
|
||||
optional string historyFile = 18;
|
||||
}
|
||||
|
||||
message AMInfoProto {
|
||||
|
@ -201,12 +201,12 @@ public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param jobFile job configuration file.
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
int runState, JobPriority jp, String user, String jobName,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
int runState, JobPriority jp, String user, String jobName,
|
||||
String jobFile, String trackingUrl) {
|
||||
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
runState, jp, user, jobName, "default", jobFile, trackingUrl);
|
||||
@ -223,17 +223,43 @@ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param jobFile job configuration file.
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
* @param isUber Whether job running in uber mode
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
int runState, JobPriority jp, String user, String jobName,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
int runState, JobPriority jp, String user, String jobName,
|
||||
String jobFile, String trackingUrl, boolean isUber) {
|
||||
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job status object for a given jobid.
|
||||
* @param jobid The jobid of the job
|
||||
* @param setupProgress The progress made on the setup
|
||||
* @param mapProgress The progress made on the maps
|
||||
* @param reduceProgress The progress made on the reduces
|
||||
* @param cleanupProgress The progress made on the cleanup
|
||||
* @param runState The current state of the job
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
* @param isUber Whether job running in uber mode
|
||||
* @param historyFile history file
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
int runState, JobPriority jp, String user, String jobName,
|
||||
String jobFile, String trackingUrl, boolean isUber,
|
||||
String historyFile) {
|
||||
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber,
|
||||
historyFile);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job status object for a given jobid.
|
||||
@ -281,11 +307,39 @@ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
int runState, JobPriority jp,
|
||||
String user, String jobName, String queue,
|
||||
String jobFile, String trackingUrl, boolean isUber) {
|
||||
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
|
||||
user, jobName, queue, jobFile, trackingUrl, isUber);
|
||||
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
runState, jp, user, jobName, queue, jobFile, trackingUrl, isUber, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job status object for a given jobid.
|
||||
* @param jobid The jobid of the job
|
||||
* @param setupProgress The progress made on the setup
|
||||
* @param mapProgress The progress made on the maps
|
||||
* @param reduceProgress The progress made on the reduces
|
||||
* @param cleanupProgress The progress made on the cleanup
|
||||
* @param runState The current state of the job
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param queue job queue name.
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
* @param isUber Whether job running in uber mode
|
||||
* @param historyFile history file
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
int runState, JobPriority jp,
|
||||
String user, String jobName, String queue,
|
||||
String jobFile, String trackingUrl, boolean isUber,
|
||||
String historyFile) {
|
||||
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
getEnum(runState),
|
||||
org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
|
||||
user, jobName, queue, jobFile, trackingUrl, isUber, historyFile);
|
||||
}
|
||||
|
||||
public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){
|
||||
JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()),
|
||||
stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
|
||||
|
@ -152,43 +152,71 @@ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job status object for a given jobid.
|
||||
* @param jobid The jobid of the job
|
||||
* @param setupProgress The progress made on the setup
|
||||
* @param mapProgress The progress made on the maps
|
||||
* @param reduceProgress The progress made on the reduces
|
||||
* @param cleanupProgress The progress made on the cleanup
|
||||
* @param runState The current state of the job
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param queue queue name
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
* @param isUber Whether job running in uber mode
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
State runState, JobPriority jp,
|
||||
String user, String jobName, String queue,
|
||||
String jobFile, String trackingUrl, boolean isUber) {
|
||||
this.jobid = jobid;
|
||||
this.setupProgress = setupProgress;
|
||||
this.mapProgress = mapProgress;
|
||||
this.reduceProgress = reduceProgress;
|
||||
this.cleanupProgress = cleanupProgress;
|
||||
this.runState = runState;
|
||||
this.user = user;
|
||||
this.queue = queue;
|
||||
if (jp == null) {
|
||||
throw new IllegalArgumentException("Job Priority cannot be null.");
|
||||
}
|
||||
priority = jp;
|
||||
this.jobName = jobName;
|
||||
this.jobFile = jobFile;
|
||||
this.trackingUrl = trackingUrl;
|
||||
this.isUber = isUber;
|
||||
* Create a job status object for a given jobid.
|
||||
* @param jobid The jobid of the job
|
||||
* @param setupProgress The progress made on the setup
|
||||
* @param mapProgress The progress made on the maps
|
||||
* @param reduceProgress The progress made on the reduces
|
||||
* @param cleanupProgress The progress made on the cleanup
|
||||
* @param runState The current state of the job
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param queue queue name
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
* @param isUber Whether job running in uber mode
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
State runState, JobPriority jp,
|
||||
String user, String jobName, String queue,
|
||||
String jobFile, String trackingUrl, boolean isUber) {
|
||||
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
|
||||
runState, jp, user, jobName, queue, jobFile, trackingUrl, isUber, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job status object for a given jobid.
|
||||
* @param jobid The jobid of the job
|
||||
* @param setupProgress The progress made on the setup
|
||||
* @param mapProgress The progress made on the maps
|
||||
* @param reduceProgress The progress made on the reduces
|
||||
* @param cleanupProgress The progress made on the cleanup
|
||||
* @param runState The current state of the job
|
||||
* @param jp Priority of the job.
|
||||
* @param user userid of the person who submitted the job.
|
||||
* @param jobName user-specified job name.
|
||||
* @param queue queue name
|
||||
* @param jobFile job configuration file.
|
||||
* @param trackingUrl link to the web-ui for details of the job.
|
||||
* @param isUber Whether job running in uber mode
|
||||
* @param historyFile history file
|
||||
*/
|
||||
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
|
||||
float reduceProgress, float cleanupProgress,
|
||||
State runState, JobPriority jp,
|
||||
String user, String jobName, String queue,
|
||||
String jobFile, String trackingUrl, boolean isUber,
|
||||
String historyFile) {
|
||||
this.jobid = jobid;
|
||||
this.setupProgress = setupProgress;
|
||||
this.mapProgress = mapProgress;
|
||||
this.reduceProgress = reduceProgress;
|
||||
this.cleanupProgress = cleanupProgress;
|
||||
this.runState = runState;
|
||||
this.user = user;
|
||||
this.queue = queue;
|
||||
if (jp == null) {
|
||||
throw new IllegalArgumentException("Job Priority cannot be null.");
|
||||
}
|
||||
priority = jp;
|
||||
this.jobName = jobName;
|
||||
this.jobFile = jobFile;
|
||||
this.trackingUrl = trackingUrl;
|
||||
this.isUber = isUber;
|
||||
this.historyFile = historyFile;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -95,7 +95,7 @@ public int run(String[] argv) throws Exception {
|
||||
String submitJobFile = null;
|
||||
String jobid = null;
|
||||
String taskid = null;
|
||||
String historyFile = null;
|
||||
String historyFileOrJobId = null;
|
||||
String historyOutFile = null;
|
||||
String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
|
||||
String counterGroupName = null;
|
||||
@ -188,15 +188,15 @@ public int run(String[] argv) throws Exception {
|
||||
// Some arguments are optional while others are not, and some require
|
||||
// second arguments. Due to this, the indexing can vary depending on
|
||||
// what's specified and what's left out, as summarized in the below table:
|
||||
// [all] <jobHistoryFile> [-outfile <file>] [-format <human|json>]
|
||||
// 1 2 3 4 5 6
|
||||
// 1 2 3 4
|
||||
// 1 2 3 4
|
||||
// 1 2
|
||||
// 1 2 3 4 5
|
||||
// 1 2 3
|
||||
// 1 2 3
|
||||
// 1
|
||||
// [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>]
|
||||
// 1 2 3 4 5 6
|
||||
// 1 2 3 4
|
||||
// 1 2 3 4
|
||||
// 1 2
|
||||
// 1 2 3 4 5
|
||||
// 1 2 3
|
||||
// 1 2 3
|
||||
// 1
|
||||
|
||||
// "all" is optional, but comes first if specified
|
||||
int index = 1;
|
||||
@ -208,8 +208,8 @@ public int run(String[] argv) throws Exception {
|
||||
return exitCode;
|
||||
}
|
||||
}
|
||||
// Get the job history file argument
|
||||
historyFile = argv[index++];
|
||||
// Get the job history file or job id argument
|
||||
historyFileOrJobId = argv[index++];
|
||||
// "-outfile" is optional, but if specified requires a second argument
|
||||
if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
|
||||
index++;
|
||||
@ -379,9 +379,28 @@ public int run(String[] argv) throws Exception {
|
||||
exitCode = 0;
|
||||
}
|
||||
} else if (viewHistory) {
|
||||
viewHistory(historyFile, viewAllHistory, historyOutFile,
|
||||
historyOutFormat);
|
||||
exitCode = 0;
|
||||
// If it ends with .jhist, assume it's a jhist file; otherwise, assume
|
||||
// it's a Job ID
|
||||
if (historyFileOrJobId.endsWith(".jhist")) {
|
||||
viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile,
|
||||
historyOutFormat);
|
||||
exitCode = 0;
|
||||
} else {
|
||||
Job job = getJob(JobID.forName(historyFileOrJobId));
|
||||
if (job == null) {
|
||||
System.out.println("Could not find job " + jobid);
|
||||
} else {
|
||||
String historyUrl = job.getHistoryUrl();
|
||||
if (historyUrl == null || historyUrl.isEmpty()) {
|
||||
System.out.println("History file for job " + historyFileOrJobId +
|
||||
" is currently unavailable.");
|
||||
} else {
|
||||
viewHistory(historyUrl, viewAllHistory, historyOutFile,
|
||||
historyOutFormat);
|
||||
exitCode = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (listEvents) {
|
||||
listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
|
||||
exitCode = 0;
|
||||
@ -493,8 +512,8 @@ private void displayUsage(String cmd) {
|
||||
System.err.println(prefix + "[" + cmd +
|
||||
" <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
|
||||
} else if ("-history".equals(cmd)) {
|
||||
System.err.println(prefix + "[" + cmd +
|
||||
" [all] <jobHistoryFile> [-outfile <file>] [-format <human|json>]]");
|
||||
System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " +
|
||||
"[-outfile <file>] [-format <human|json>]]");
|
||||
} else if ("-list".equals(cmd)) {
|
||||
System.err.println(prefix + "[" + cmd + " [all]]");
|
||||
} else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
|
||||
@ -527,8 +546,8 @@ private void displayUsage(String cmd) {
|
||||
"Valid values for priorities are: " + jobPriorityValues +
|
||||
". In addition to this, integers also can be used." + "%n");
|
||||
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
|
||||
System.err.printf("\t[-history [all] <jobHistoryFile> [-outfile <file>]" +
|
||||
" [-format <human|json>]]%n");
|
||||
System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " +
|
||||
"[-outfile <file>] [-format <human|json>]]%n");
|
||||
System.err.printf("\t[-list [all]]%n");
|
||||
System.err.printf("\t[-list-active-trackers]%n");
|
||||
System.err.printf("\t[-list-blacklisted-trackers]%n");
|
||||
|
@ -81,7 +81,7 @@ Copy file or directories recursively. More information can be found at
|
||||
|
||||
Command to interact with Map Reduce Jobs.
|
||||
|
||||
Usage: `mapred job | [GENERIC_OPTIONS] | [-submit <job-file>] | [-status <job-id>] | [-counter <job-id> <group-name> <counter-name>] | [-kill <job-id>] | [-events <job-id> <from-event-#> <#-of-events>] | [-history [all] <jobHistoryFile> [-outfile <file>] [-format <human|json>]] | [-list [all]] | [-kill-task <task-id>] | [-fail-task <task-id>] | [-set-priority <job-id> <priority>] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids <job-id> <task-type> <task-state>] [-logs <job-id> <task-attempt-id>]`
|
||||
Usage: `mapred job | [GENERIC_OPTIONS] | [-submit <job-file>] | [-status <job-id>] | [-counter <job-id> <group-name> <counter-name>] | [-kill <job-id>] | [-events <job-id> <from-event-#> <#-of-events>] | [-history [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>]] | [-list [all]] | [-kill-task <task-id>] | [-fail-task <task-id>] | [-set-priority <job-id> <priority>] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids <job-id> <task-type> <task-state>] [-logs <job-id> <task-attempt-id>]`
|
||||
|
||||
| COMMAND\_OPTION | Description |
|
||||
|:---- |:---- |
|
||||
@ -90,7 +90,7 @@ Usage: `mapred job | [GENERIC_OPTIONS] | [-submit <job-file>] | [-status <job-id
|
||||
| -counter *job-id* *group-name* *counter-name* | Prints the counter value. |
|
||||
| -kill *job-id* | Kills the job. |
|
||||
| -events *job-id* *from-event-\#* *\#-of-events* | Prints the events' details received by jobtracker for the given range. |
|
||||
| -history [all] *jobHistoryFile* [-outfile *file*] [-format *human|json*] | Prints job details, failed and killed task details. More details about the job such as successful tasks, task attempts made for each task, task counters, etc can be viewed by specifying the [all] option. An optional file output path (instead of stdout) can be specified. The format defaults to human-readable but can also be changed to JSON with the [-format] option. |
|
||||
| -history [all] *jobHistoryFile|jobId* [-outfile *file*] [-format *human|json*] | Prints job details, failed and killed task details. More details about the job such as successful tasks, task attempts made for each task, task counters, etc can be viewed by specifying the [all] option. An optional file output path (instead of stdout) can be specified. The format defaults to human-readable but can also be changed to JSON with the [-format] option. |
|
||||
| -list [all] | Displays jobs which are yet to complete. `-list all` displays all jobs. |
|
||||
| -kill-task *task-id* | Kills the task. Killed tasks are NOT counted against failed attempts. |
|
||||
| -fail-task *task-id* | Fails the task. Failed tasks are counted against failed attempts. |
|
||||
|
@ -166,6 +166,7 @@ private void constructJobReport() {
|
||||
report.setTrackingUrl(historyUrl);
|
||||
report.setAMInfos(getAMInfos());
|
||||
report.setIsUber(isUber());
|
||||
report.setHistoryFile(info.getHistoryFile().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -454,10 +454,10 @@ public synchronized Job loadJob() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the history file. This should only be used for testing.
|
||||
* Return the history file.
|
||||
* @return the history file.
|
||||
*/
|
||||
synchronized Path getHistoryFile() {
|
||||
public synchronized Path getHistoryFile() {
|
||||
return historyFile;
|
||||
}
|
||||
|
||||
|
@ -90,6 +90,7 @@ public static Collection<Object[]> data() {
|
||||
public void testCompletedJob() throws Exception {
|
||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||
when(info.getHistoryFile()).thenReturn(fullHistoryPath);
|
||||
//Re-initialize to verify the delayed load.
|
||||
completedJob =
|
||||
new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user",
|
||||
@ -109,12 +110,14 @@ public void testCompletedJob() throws Exception {
|
||||
JobReport jobReport = completedJob.getReport();
|
||||
assertEquals("user", jobReport.getUser());
|
||||
assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
|
||||
assertEquals(fullHistoryPath.toString(), jobReport.getHistoryFile());
|
||||
}
|
||||
|
||||
@Test (timeout=100000)
|
||||
public void testCopmletedJobReportWithZeroTasks() throws Exception {
|
||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||
when(info.getHistoryFile()).thenReturn(fullHistoryPathZeroReduces);
|
||||
completedJob =
|
||||
new CompletedJob(conf, jobId, fullHistoryPathZeroReduces, loadTasks, "user",
|
||||
info, jobAclsManager);
|
||||
@ -124,6 +127,8 @@ public void testCopmletedJobReportWithZeroTasks() throws Exception {
|
||||
assertEquals(0, completedJob.getCompletedReduces());
|
||||
// Verify that the reduce progress is 1.0 (not NaN)
|
||||
assertEquals(1.0, jobReport.getReduceProgress(), 0.001);
|
||||
assertEquals(fullHistoryPathZeroReduces.toString(),
|
||||
jobReport.getHistoryFile());
|
||||
}
|
||||
|
||||
@Test (timeout=10000)
|
||||
|
@ -68,6 +68,7 @@ public void testAverageMergeTime() throws IOException {
|
||||
|
||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||
when(info.getHistoryFile()).thenReturn(fulleHistoryPath);
|
||||
|
||||
JobId jobId = MRBuilderUtils.newJobId(1329348432655l, 1, 1);
|
||||
CompletedJob completedJob =
|
||||
|
@ -29,7 +29,10 @@
|
||||
import java.io.PipedInputStream;
|
||||
import java.io.PipedOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.Assert;
|
||||
@ -83,6 +86,7 @@ private Job runJobInBackGround(Configuration conf) throws Exception {
|
||||
|
||||
public static int runTool(Configuration conf, Tool tool, String[] args,
|
||||
OutputStream out) throws Exception {
|
||||
LOG.info("args = " + Arrays.toString(args));
|
||||
PrintStream oldOut = System.out;
|
||||
PrintStream newOut = new PrintStream(out, true);
|
||||
try {
|
||||
@ -140,7 +144,7 @@ public void testJobClient() throws Exception {
|
||||
// test list of events
|
||||
testJobEvents(jobId, conf);
|
||||
// test job history
|
||||
testJobHistory(conf);
|
||||
testJobHistory(jobId, conf);
|
||||
// test tracker list
|
||||
testListTrackers(conf);
|
||||
// attempts list
|
||||
@ -354,111 +358,127 @@ private void testListTrackers(Configuration conf) throws Exception {
|
||||
/**
|
||||
* print job history from file
|
||||
*/
|
||||
private void testJobHistory(Configuration conf) throws Exception {
|
||||
private void testJobHistory(String jobId, Configuration conf)
|
||||
throws Exception {
|
||||
CLI jc = createJobClient();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
|
||||
FileSystem localFs = FileSystem.getLocal(conf);
|
||||
String historyFileUri = new Path(f.getAbsolutePath())
|
||||
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri()
|
||||
.toString();
|
||||
|
||||
// Try a bunch of different valid combinations of the command to test
|
||||
// argument parsing
|
||||
int exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
"all",
|
||||
historyFileUri,
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanOutput(out);
|
||||
File outFile = File.createTempFile("myout", ".txt");
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
"all",
|
||||
historyFileUri,
|
||||
"-outfile",
|
||||
outFile.getAbsolutePath()
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanFileOutput(out, outFile);
|
||||
outFile = File.createTempFile("myout", ".txt");
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
"all",
|
||||
historyFileUri,
|
||||
"-outfile",
|
||||
outFile.getAbsolutePath(),
|
||||
"-format",
|
||||
"human"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanFileOutput(out, outFile);
|
||||
// Find jhist file
|
||||
String historyFileUri = null;
|
||||
RemoteIterator<LocatedFileStatus> it =
|
||||
getFileSystem().listFiles(new Path("/"), true);
|
||||
while (it.hasNext() && historyFileUri == null) {
|
||||
LocatedFileStatus file = it.next();
|
||||
if (file.getPath().getName().endsWith(".jhist")) {
|
||||
historyFileUri = file.getPath().toUri().toString();
|
||||
}
|
||||
}
|
||||
assertNotNull("Could not find jhist file", historyFileUri);
|
||||
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
historyFileUri,
|
||||
"-format",
|
||||
"human"
|
||||
for (String historyFileOrJobId : new String[]{historyFileUri, jobId}) {
|
||||
// Try a bunch of different valid combinations of the command
|
||||
int exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
"all",
|
||||
historyFileOrJobId,
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanOutput(out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanOutput(jobId, out);
|
||||
File outFile = File.createTempFile("myout", ".txt");
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
"all",
|
||||
historyFileOrJobId,
|
||||
"-outfile",
|
||||
outFile.getAbsolutePath()
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanFileOutput(jobId, out, outFile);
|
||||
outFile = File.createTempFile("myout", ".txt");
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
"all",
|
||||
historyFileOrJobId,
|
||||
"-outfile",
|
||||
outFile.getAbsolutePath(),
|
||||
"-format",
|
||||
"human"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanFileOutput(jobId, out, outFile);
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileOrJobId,
|
||||
"-format",
|
||||
"human"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryHumanOutput(jobId, out);
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
"all",
|
||||
historyFileOrJobId,
|
||||
"-format",
|
||||
"json"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryJSONOutput(jobId, out);
|
||||
outFile = File.createTempFile("myout", ".txt");
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
"all",
|
||||
historyFileOrJobId,
|
||||
"-outfile",
|
||||
outFile.getAbsolutePath(),
|
||||
"-format",
|
||||
"json"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryJSONFileOutput(jobId, out, outFile);
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileOrJobId,
|
||||
"-format",
|
||||
"json"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryJSONOutput(jobId, out);
|
||||
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
"all",
|
||||
historyFileUri,
|
||||
"-format",
|
||||
"json"
|
||||
// Check some bad arguments
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileOrJobId,
|
||||
"foo"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryJSONOutput(out);
|
||||
outFile = File.createTempFile("myout", ".txt");
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
"all",
|
||||
historyFileUri,
|
||||
"-outfile",
|
||||
outFile.getAbsolutePath(),
|
||||
"-format",
|
||||
"json"
|
||||
assertEquals("Exit code", -1, exitCode);
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileOrJobId,
|
||||
"-format"
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryJSONFileOutput(out, outFile);
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
historyFileUri,
|
||||
"-format",
|
||||
"json"
|
||||
assertEquals("Exit code", -1, exitCode);
|
||||
exitCode = runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileOrJobId,
|
||||
"-outfile",
|
||||
}, out);
|
||||
assertEquals("Exit code", 0, exitCode);
|
||||
checkHistoryJSONOutput(out);
|
||||
|
||||
// Check some bad arguments
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
historyFileUri,
|
||||
"foo"
|
||||
}, out);
|
||||
assertEquals("Exit code", -1, exitCode);
|
||||
exitCode = runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
historyFileUri,
|
||||
"-format"
|
||||
}, out);
|
||||
assertEquals("Exit code", -1, exitCode);
|
||||
runTool(conf, jc, new String[] {
|
||||
"-history",
|
||||
historyFileUri,
|
||||
"-outfile",
|
||||
}, out);
|
||||
assertEquals("Exit code", -1, exitCode);
|
||||
assertEquals("Exit code", -1, exitCode);
|
||||
try {
|
||||
runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileOrJobId,
|
||||
"-format",
|
||||
"foo"
|
||||
}, out);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
try {
|
||||
runTool(conf, jc, new String[]{
|
||||
"-history",
|
||||
historyFileUri,
|
||||
"-format",
|
||||
"foo"
|
||||
"not_a_valid_history_file_or_job_id",
|
||||
}, out);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
@ -466,45 +486,47 @@ private void testJobHistory(Configuration conf) throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkHistoryHumanOutput(ByteArrayOutputStream out)
|
||||
private void checkHistoryHumanOutput(String jobId, ByteArrayOutputStream out)
|
||||
throws IOException, JSONException {
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||
new ByteArrayInputStream(out.toByteArray())));
|
||||
br.readLine();
|
||||
String line = br.readLine();
|
||||
br.close();
|
||||
assertEquals("Hadoop job: job_1329348432655_0001", line);
|
||||
assertEquals("Hadoop job: " + jobId, line);
|
||||
out.reset();
|
||||
}
|
||||
|
||||
private void checkHistoryJSONOutput(ByteArrayOutputStream out)
|
||||
private void checkHistoryJSONOutput(String jobId, ByteArrayOutputStream out)
|
||||
throws IOException, JSONException {
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||
new ByteArrayInputStream(out.toByteArray())));
|
||||
String line = org.apache.commons.io.IOUtils.toString(br);
|
||||
br.close();
|
||||
JSONObject json = new JSONObject(line);
|
||||
assertEquals("job_1329348432655_0001", json.getString("hadoopJob"));
|
||||
assertEquals(jobId, json.getString("hadoopJob"));
|
||||
out.reset();
|
||||
}
|
||||
|
||||
private void checkHistoryHumanFileOutput(ByteArrayOutputStream out,
|
||||
File outFile) throws IOException, JSONException {
|
||||
private void checkHistoryHumanFileOutput(String jobId,
|
||||
ByteArrayOutputStream out, File outFile)
|
||||
throws IOException, JSONException {
|
||||
BufferedReader br = new BufferedReader(new FileReader(outFile));
|
||||
br.readLine();
|
||||
String line = br.readLine();
|
||||
br.close();
|
||||
assertEquals("Hadoop job: job_1329348432655_0001", line);
|
||||
assertEquals("Hadoop job: " + jobId, line);
|
||||
assertEquals(0, out.size());
|
||||
}
|
||||
|
||||
private void checkHistoryJSONFileOutput(ByteArrayOutputStream out,
|
||||
File outFile) throws IOException, JSONException {
|
||||
private void checkHistoryJSONFileOutput(String jobId,
|
||||
ByteArrayOutputStream out, File outFile)
|
||||
throws IOException, JSONException {
|
||||
BufferedReader br = new BufferedReader(new FileReader(outFile));
|
||||
String line = org.apache.commons.io.IOUtils.toString(br);
|
||||
br.close();
|
||||
JSONObject json = new JSONObject(line);
|
||||
assertEquals("job_1329348432655_0001", json.getString("hadoopJob"));
|
||||
assertEquals(jobId, json.getString("hadoopJob"));
|
||||
assertEquals(0, out.size());
|
||||
}
|
||||
|
||||
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue
Block a user