MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1357723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d0cab2729
commit
72fb517469
@ -132,6 +132,8 @@ Branch-2 ( Unreleased changes )
|
|||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-4146. Support limits on task status string length and number of
|
MAPREDUCE-4146. Support limits on task status string length and number of
|
||||||
|
@ -441,6 +441,14 @@ public String getFailureInfo() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JobStatus getJobStatus() throws IOException {
|
||||||
|
try {
|
||||||
|
return JobStatus.downgrade(job.getStatus());
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw new IOException(ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -149,8 +149,16 @@ public interface RunningJob {
|
|||||||
public int getJobState() throws IOException;
|
public int getJobState() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Kill the running job. Blocks until all job tasks have been
|
* Returns a snapshot of the current status, {@link JobStatus}, of the Job.
|
||||||
* killed as well. If the job is no longer running, it simply returns.
|
* Need to call again for latest information.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public JobStatus getJobStatus() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kill the running job. Blocks until all job tasks have been killed as well.
|
||||||
|
* If the job is no longer running, it simply returns.
|
||||||
*
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -18,16 +18,31 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.List;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
||||||
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
|
|
||||||
public class TestNetworkedJob {
|
public class TestNetworkedJob {
|
||||||
|
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
||||||
|
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
||||||
|
private static Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
|
||||||
|
private static Path inFile = new Path(testDir, "in");
|
||||||
|
private static Path outDir = new Path(testDir, "out");
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
@ -41,4 +56,53 @@ public void testGetNullCounters() throws Exception {
|
|||||||
//verification
|
//verification
|
||||||
verify(mockJob).getCounters();
|
verify(mockJob).getCounters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetJobStatus() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
MiniMRClientCluster mr = null;
|
||||||
|
FileSystem fileSys = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
|
||||||
|
new Configuration());
|
||||||
|
|
||||||
|
JobConf job = new JobConf(mr.getConfig());
|
||||||
|
|
||||||
|
fileSys = FileSystem.get(job);
|
||||||
|
fileSys.delete(testDir, true);
|
||||||
|
FSDataOutputStream out = fileSys.create(inFile, true);
|
||||||
|
out.writeBytes("This is a test file");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
FileInputFormat.setInputPaths(job, inFile);
|
||||||
|
FileOutputFormat.setOutputPath(job, outDir);
|
||||||
|
|
||||||
|
job.setInputFormat(TextInputFormat.class);
|
||||||
|
job.setOutputFormat(TextOutputFormat.class);
|
||||||
|
|
||||||
|
job.setMapperClass(IdentityMapper.class);
|
||||||
|
job.setReducerClass(IdentityReducer.class);
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
|
||||||
|
JobClient client = new JobClient(mr.getConfig());
|
||||||
|
RunningJob rj = client.submitJob(job);
|
||||||
|
JobID jobId = rj.getID();
|
||||||
|
|
||||||
|
// The following asserts read JobStatus twice and ensure the returned
|
||||||
|
// JobStatus objects correspond to the same Job.
|
||||||
|
assertEquals("Expected matching JobIDs", jobId, client.getJob(jobId)
|
||||||
|
.getJobStatus().getJobID());
|
||||||
|
assertEquals("Expected matching startTimes", rj.getJobStatus()
|
||||||
|
.getStartTime(), client.getJob(jobId).getJobStatus()
|
||||||
|
.getStartTime());
|
||||||
|
} finally {
|
||||||
|
if (fileSys != null) {
|
||||||
|
fileSys.delete(testDir, true);
|
||||||
|
}
|
||||||
|
if (mr != null) {
|
||||||
|
mr.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user