MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey Gorshkov via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1465017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
18757c1267
commit
a5734cd38a
@ -773,6 +773,9 @@ Release 0.23.7 - UNRELEASED
|
|||||||
|
|
||||||
MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
|
MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey
|
||||||
|
Gorshkov via tgraves)
|
||||||
|
|
||||||
Release 0.23.6 - UNRELEASED
|
Release 0.23.6 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -869,4 +869,9 @@ void clean() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// for test
|
||||||
|
@VisibleForTesting
|
||||||
|
void setMaxHistoryAge(long newValue){
|
||||||
|
maxHistoryAge=newValue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,46 +21,75 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.CompletedTask;
|
import org.apache.hadoop.mapreduce.v2.hs.CompletedTask;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import static org.mockito.Mockito.*;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class TestCompletedTask{
|
public class TestCompletedTask{
|
||||||
|
|
||||||
@Test
|
@Test (timeout=5000)
|
||||||
public void testTaskStartTimes() {
|
public void testTaskStartTimes() {
|
||||||
|
|
||||||
TaskId taskId = Mockito.mock(TaskId.class);
|
TaskId taskId = mock(TaskId.class);
|
||||||
TaskInfo taskInfo = Mockito.mock(TaskInfo.class);
|
TaskInfo taskInfo = mock(TaskInfo.class);
|
||||||
Map<TaskAttemptID, TaskAttemptInfo> taskAttempts
|
Map<TaskAttemptID, TaskAttemptInfo> taskAttempts
|
||||||
= new TreeMap<TaskAttemptID, TaskAttemptInfo>();
|
= new TreeMap<TaskAttemptID, TaskAttemptInfo>();
|
||||||
|
|
||||||
TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0);
|
TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0);
|
||||||
TaskAttemptInfo info = Mockito.mock(TaskAttemptInfo.class);
|
TaskAttemptInfo info = mock(TaskAttemptInfo.class);
|
||||||
Mockito.when(info.getAttemptId()).thenReturn(id);
|
when(info.getAttemptId()).thenReturn(id);
|
||||||
Mockito.when(info.getStartTime()).thenReturn(10l);
|
when(info.getStartTime()).thenReturn(10l);
|
||||||
taskAttempts.put(id, info);
|
taskAttempts.put(id, info);
|
||||||
|
|
||||||
id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1);
|
id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1);
|
||||||
info = Mockito.mock(TaskAttemptInfo.class);
|
info = mock(TaskAttemptInfo.class);
|
||||||
Mockito.when(info.getAttemptId()).thenReturn(id);
|
when(info.getAttemptId()).thenReturn(id);
|
||||||
Mockito.when(info.getStartTime()).thenReturn(20l);
|
when(info.getStartTime()).thenReturn(20l);
|
||||||
taskAttempts.put(id, info);
|
taskAttempts.put(id, info);
|
||||||
|
|
||||||
|
|
||||||
Mockito.when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts);
|
when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts);
|
||||||
CompletedTask task = new CompletedTask(taskId, taskInfo);
|
CompletedTask task = new CompletedTask(taskId, taskInfo);
|
||||||
TaskReport report = task.getReport();
|
TaskReport report = task.getReport();
|
||||||
|
|
||||||
// Make sure the startTime returned by report is the lesser of the
|
// Make sure the startTime returned by report is the lesser of the
|
||||||
// attempy launch times
|
// attempy launch times
|
||||||
Assert.assertTrue(report.getStartTime() == 10);
|
assertTrue(report.getStartTime() == 10);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* test some methods of CompletedTaskAttempt
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testCompletedTaskAttempt(){
|
||||||
|
|
||||||
|
TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class);
|
||||||
|
when(attemptInfo.getRackname()).thenReturn("Rackname");
|
||||||
|
when(attemptInfo.getShuffleFinishTime()).thenReturn(11L);
|
||||||
|
when(attemptInfo.getSortFinishTime()).thenReturn(12L);
|
||||||
|
when(attemptInfo.getShufflePort()).thenReturn(10);
|
||||||
|
|
||||||
|
JobID jobId= new JobID("12345",0);
|
||||||
|
TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0);
|
||||||
|
TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0);
|
||||||
|
when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId);
|
||||||
|
|
||||||
|
|
||||||
|
CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo);
|
||||||
|
assertEquals( "Rackname", taskAttemt.getNodeRackName());
|
||||||
|
assertEquals( Phase.CLEANUP, taskAttemt.getPhase());
|
||||||
|
assertTrue( taskAttemt.isFinished());
|
||||||
|
assertEquals( 11L, taskAttemt.getShuffleFinishTime());
|
||||||
|
assertEquals( 12L, taskAttemt.getSortFinishTime());
|
||||||
|
assertEquals( 10, taskAttemt.getShufflePort());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,9 @@
|
|||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
import org.junit.runners.Parameterized.Parameters;
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
@RunWith(value = Parameterized.class)
|
@RunWith(value = Parameterized.class)
|
||||||
@ -79,7 +81,7 @@ public static Collection<Object[]> data() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Verify some expected values based on the history file */
|
/* Verify some expected values based on the history file */
|
||||||
@Test (timeout=10000)
|
@Test (timeout=100000)
|
||||||
public void testCompletedJob() throws Exception {
|
public void testCompletedJob() throws Exception {
|
||||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||||
@ -168,4 +170,45 @@ public void testCompletedTaskAttempt() throws Exception {
|
|||||||
assertEquals(45454, rta1Report.getNodeManagerPort());
|
assertEquals(45454, rta1Report.getNodeManagerPort());
|
||||||
assertEquals(9999, rta1Report.getNodeManagerHttpPort());
|
assertEquals(9999, rta1Report.getNodeManagerHttpPort());
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Simple test of some methods of CompletedJob
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test (timeout=30000)
|
||||||
|
public void testGetTaskAttemptCompletionEvent() throws Exception{
|
||||||
|
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||||
|
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||||
|
completedJob =
|
||||||
|
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
|
||||||
|
info, jobAclsManager);
|
||||||
|
TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000);
|
||||||
|
assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length);
|
||||||
|
int currentEventId=0;
|
||||||
|
for (TaskCompletionEvent taskAttemptCompletionEvent : events) {
|
||||||
|
int eventId= taskAttemptCompletionEvent.getEventId();
|
||||||
|
assertTrue(eventId>=currentEventId);
|
||||||
|
currentEventId=eventId;
|
||||||
|
}
|
||||||
|
assertNull(completedJob.loadConfFile() );
|
||||||
|
// job name
|
||||||
|
assertEquals("Sleep job",completedJob.getName());
|
||||||
|
// queue name
|
||||||
|
assertEquals("default",completedJob.getQueueName());
|
||||||
|
// progress
|
||||||
|
assertEquals(1.0, completedJob.getProgress(),0.001);
|
||||||
|
// 11 rows in answer
|
||||||
|
assertEquals(11,completedJob.getTaskAttemptCompletionEvents(0,1000).length);
|
||||||
|
// select first 10 rows
|
||||||
|
assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length);
|
||||||
|
// select 5-10 rows include 5th
|
||||||
|
assertEquals(6,completedJob.getTaskAttemptCompletionEvents(5,10).length);
|
||||||
|
|
||||||
|
// without errors
|
||||||
|
assertEquals(1,completedJob.getDiagnostics().size());
|
||||||
|
assertEquals("",completedJob.getDiagnostics().get(0));
|
||||||
|
|
||||||
|
assertEquals(0, completedJob.getJobACLs().size());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,9 @@
|
|||||||
package org.apache.hadoop.mapreduce.v2.hs;
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -54,6 +57,9 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
@ -65,7 +71,9 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.service.Service;
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
@ -80,12 +88,12 @@ public class TestJobHistoryParsing {
|
|||||||
|
|
||||||
private static final String RACK_NAME = "/MyRackName";
|
private static final String RACK_NAME = "/MyRackName";
|
||||||
|
|
||||||
private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
|
private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
|
||||||
|
|
||||||
public static class MyResolver implements DNSToSwitchMapping {
|
public static class MyResolver implements DNSToSwitchMapping {
|
||||||
@Override
|
@Override
|
||||||
public List<String> resolve(List<String> names) {
|
public List<String> resolve(List<String> names) {
|
||||||
return Arrays.asList(new String[]{RACK_NAME});
|
return Arrays.asList(new String[] { RACK_NAME });
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -93,14 +101,14 @@ public void reloadCachedMappings() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=50000)
|
@Test(timeout = 50000)
|
||||||
public void testJobInfo() throws Exception {
|
public void testJobInfo() throws Exception {
|
||||||
JobInfo info = new JobInfo();
|
JobInfo info = new JobInfo();
|
||||||
Assert.assertEquals("NORMAL", info.getPriority());
|
Assert.assertEquals("NORMAL", info.getPriority());
|
||||||
info.printAll();
|
info.printAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=50000)
|
@Test(timeout = 300000)
|
||||||
public void testHistoryParsing() throws Exception {
|
public void testHistoryParsing() throws Exception {
|
||||||
LOG.info("STARTING testHistoryParsing()");
|
LOG.info("STARTING testHistoryParsing()");
|
||||||
try {
|
try {
|
||||||
@ -110,7 +118,7 @@ public void testHistoryParsing() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=50000)
|
@Test(timeout = 50000)
|
||||||
public void testHistoryParsingWithParseErrors() throws Exception {
|
public void testHistoryParsingWithParseErrors() throws Exception {
|
||||||
LOG.info("STARTING testHistoryParsingWithParseErrors()");
|
LOG.info("STARTING testHistoryParsingWithParseErrors()");
|
||||||
try {
|
try {
|
||||||
@ -120,7 +128,8 @@ public void testHistoryParsingWithParseErrors() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getJobSummary(FileContext fc, Path path) throws IOException {
|
private static String getJobSummary(FileContext fc, Path path)
|
||||||
|
throws IOException {
|
||||||
Path qPath = fc.makeQualified(path);
|
Path qPath = fc.makeQualified(path);
|
||||||
FSDataInputStream in = fc.open(qPath);
|
FSDataInputStream in = fc.open(qPath);
|
||||||
String jobSummaryString = in.readUTF();
|
String jobSummaryString = in.readUTF();
|
||||||
@ -129,8 +138,7 @@ private static String getJobSummary(FileContext fc, Path path) throws IOExceptio
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void checkHistoryParsing(final int numMaps, final int numReduces,
|
private void checkHistoryParsing(final int numMaps, final int numReduces,
|
||||||
final int numSuccessfulMaps)
|
final int numSuccessfulMaps) throws Exception {
|
||||||
throws Exception {
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
|
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
|
||||||
long amStartTimeEst = System.currentTimeMillis();
|
long amStartTimeEst = System.currentTimeMillis();
|
||||||
@ -138,9 +146,8 @@ private void checkHistoryParsing(final int numMaps, final int numReduces,
|
|||||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
MyResolver.class, DNSToSwitchMapping.class);
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
RackResolver.init(conf);
|
RackResolver.init(conf);
|
||||||
MRApp app =
|
MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
|
||||||
new MRAppWithHistory(numMaps, numReduces, true,
|
.getName(), true);
|
||||||
this.getClass().getName(), true);
|
|
||||||
app.submit(conf);
|
app.submit(conf);
|
||||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
JobId jobId = job.getID();
|
JobId jobId = job.getID();
|
||||||
@ -185,20 +192,22 @@ private void checkHistoryParsing(final int numMaps, final int numReduces,
|
|||||||
Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
|
Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
|
||||||
Assert.assertTrue("launchTime should not be 0",
|
Assert.assertTrue("launchTime should not be 0",
|
||||||
Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
|
Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
|
||||||
Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
|
|
||||||
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
|
|
||||||
Assert
|
Assert
|
||||||
.assertTrue(
|
.assertTrue(
|
||||||
"firstReduceTaskLaunchTime should not be 0",
|
"firstMapTaskLaunchTime should not be 0",
|
||||||
Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
|
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
|
||||||
|
Assert
|
||||||
|
.assertTrue("firstReduceTaskLaunchTime should not be 0",
|
||||||
|
Long.parseLong(jobSummaryElements
|
||||||
|
.get("firstReduceTaskLaunchTime")) != 0);
|
||||||
Assert.assertTrue("finishTime should not be 0",
|
Assert.assertTrue("finishTime should not be 0",
|
||||||
Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
|
Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
|
||||||
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
|
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
|
||||||
Integer.parseInt(jobSummaryElements.get("numMaps")));
|
Integer.parseInt(jobSummaryElements.get("numMaps")));
|
||||||
Assert.assertEquals("Mismatch in num reduce slots", numReduces,
|
Assert.assertEquals("Mismatch in num reduce slots", numReduces,
|
||||||
Integer.parseInt(jobSummaryElements.get("numReduces")));
|
Integer.parseInt(jobSummaryElements.get("numReduces")));
|
||||||
Assert.assertEquals("User does not match", System.getProperty("user.name"),
|
Assert.assertEquals("User does not match",
|
||||||
jobSummaryElements.get("user"));
|
System.getProperty("user.name"), jobSummaryElements.get("user"));
|
||||||
Assert.assertEquals("Queue does not match", "default",
|
Assert.assertEquals("Queue does not match", "default",
|
||||||
jobSummaryElements.get("queue"));
|
jobSummaryElements.get("queue"));
|
||||||
Assert.assertEquals("Status does not match", "SUCCEEDED",
|
Assert.assertEquals("Status does not match", "SUCCEEDED",
|
||||||
@ -211,7 +220,7 @@ private void checkHistoryParsing(final int numMaps, final int numReduces,
|
|||||||
JobInfo jobInfo;
|
JobInfo jobInfo;
|
||||||
long numFinishedMaps;
|
long numFinishedMaps;
|
||||||
|
|
||||||
synchronized(fileInfo) {
|
synchronized (fileInfo) {
|
||||||
Path historyFilePath = fileInfo.getHistoryFile();
|
Path historyFilePath = fileInfo.getHistoryFile();
|
||||||
FSDataInputStream in = null;
|
FSDataInputStream in = null;
|
||||||
LOG.info("JobHistoryFile is: " + historyFilePath);
|
LOG.info("JobHistoryFile is: " + historyFilePath);
|
||||||
@ -228,11 +237,11 @@ private void checkHistoryParsing(final int numMaps, final int numReduces,
|
|||||||
if (numMaps == numSuccessfulMaps) {
|
if (numMaps == numSuccessfulMaps) {
|
||||||
reader = realReader;
|
reader = realReader;
|
||||||
} else {
|
} else {
|
||||||
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
|
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
|
||||||
Mockito.when(reader.getNextEvent()).thenAnswer(
|
Mockito.when(reader.getNextEvent()).thenAnswer(
|
||||||
new Answer<HistoryEvent>() {
|
new Answer<HistoryEvent>() {
|
||||||
public HistoryEvent answer(InvocationOnMock invocation)
|
public HistoryEvent answer(InvocationOnMock invocation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HistoryEvent event = realReader.getNextEvent();
|
HistoryEvent event = realReader.getNextEvent();
|
||||||
if (event instanceof TaskFinishedEvent) {
|
if (event instanceof TaskFinishedEvent) {
|
||||||
numFinishedEvents.incrementAndGet();
|
numFinishedEvents.incrementAndGet();
|
||||||
@ -244,14 +253,12 @@ public HistoryEvent answer(InvocationOnMock invocation)
|
|||||||
throw new IOException("test");
|
throw new IOException("test");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
jobInfo = parser.parse(reader);
|
jobInfo = parser.parse(reader);
|
||||||
|
|
||||||
numFinishedMaps =
|
numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
|
||||||
computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
|
|
||||||
|
|
||||||
if (numFinishedMaps != numMaps) {
|
if (numFinishedMaps != numMaps) {
|
||||||
Exception parseException = parser.getParseException();
|
Exception parseException = parser.getParseException();
|
||||||
@ -276,7 +283,7 @@ public HistoryEvent answer(InvocationOnMock invocation)
|
|||||||
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
|
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
|
||||||
int totalTasks = allTasks.size();
|
int totalTasks = allTasks.size();
|
||||||
Assert.assertEquals("total number of tasks is incorrect ",
|
Assert.assertEquals("total number of tasks is incorrect ",
|
||||||
(numMaps+numReduces), totalTasks);
|
(numMaps + numReduces), totalTasks);
|
||||||
|
|
||||||
// Verify aminfo
|
// Verify aminfo
|
||||||
Assert.assertEquals(1, jobInfo.getAMInfos().size());
|
Assert.assertEquals(1, jobInfo.getAMInfos().size());
|
||||||
@ -306,8 +313,7 @@ public HistoryEvent answer(InvocationOnMock invocation)
|
|||||||
|
|
||||||
// Deep compare Job and JobInfo
|
// Deep compare Job and JobInfo
|
||||||
for (Task task : job.getTasks().values()) {
|
for (Task task : job.getTasks().values()) {
|
||||||
TaskInfo taskInfo = allTasks.get(
|
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
|
||||||
TypeConverter.fromYarn(task.getID()));
|
|
||||||
Assert.assertNotNull("TaskInfo not found", taskInfo);
|
Assert.assertNotNull("TaskInfo not found", taskInfo);
|
||||||
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
|
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
|
||||||
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
|
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
|
||||||
@ -320,14 +326,14 @@ public HistoryEvent answer(InvocationOnMock invocation)
|
|||||||
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
|
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
|
||||||
|
|
||||||
// Verify rack-name
|
// Verify rack-name
|
||||||
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
|
Assert.assertEquals("rack-name is incorrect",
|
||||||
.getRackname(), RACK_NAME);
|
taskAttemptInfo.getRackname(), RACK_NAME);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// test output for HistoryViewer
|
// test output for HistoryViewer
|
||||||
PrintStream stdps=System.out;
|
PrintStream stdps = System.out;
|
||||||
try {
|
try {
|
||||||
System.setOut(new PrintStream(outContent));
|
System.setOut(new PrintStream(outContent));
|
||||||
HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
|
HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
|
||||||
@ -336,9 +342,14 @@ public HistoryEvent answer(InvocationOnMock invocation)
|
|||||||
|
|
||||||
for (TaskInfo taskInfo : allTasks.values()) {
|
for (TaskInfo taskInfo : allTasks.values()) {
|
||||||
|
|
||||||
String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID();
|
String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo
|
||||||
Assert.assertTrue(outContent.toString().indexOf(test)>0);
|
.getTaskStatus())
|
||||||
Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0);
|
+ " "
|
||||||
|
+ taskInfo.getTaskType()
|
||||||
|
+ " task list for " + taskInfo.getTaskId().getJobID();
|
||||||
|
Assert.assertTrue(outContent.toString().indexOf(test) > 0);
|
||||||
|
Assert.assertTrue(outContent.toString().indexOf(
|
||||||
|
taskInfo.getTaskId().toString()) > 0);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
System.setOut(stdps);
|
System.setOut(stdps);
|
||||||
@ -364,182 +375,176 @@ private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
|
|||||||
return numFinishedMaps;
|
return numFinishedMaps;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=50000)
|
@Test(timeout = 30000)
|
||||||
public void testHistoryParsingForFailedAttempts() throws Exception {
|
public void testHistoryParsingForFailedAttempts() throws Exception {
|
||||||
LOG.info("STARTING testHistoryParsingForFailedAttempts");
|
LOG.info("STARTING testHistoryParsingForFailedAttempts");
|
||||||
try {
|
try {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf
|
conf.setClass(
|
||||||
.setClass(
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
MyResolver.class, DNSToSwitchMapping.class);
|
RackResolver.init(conf);
|
||||||
RackResolver.init(conf);
|
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
|
||||||
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(),
|
.getClass().getName(), true);
|
||||||
true);
|
app.submit(conf);
|
||||||
app.submit(conf);
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
JobId jobId = job.getID();
|
||||||
JobId jobId = job.getID();
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
app.waitForState(job, JobState.SUCCEEDED);
|
|
||||||
|
|
||||||
// make sure all events are flushed
|
// make sure all events are flushed
|
||||||
app.waitForState(Service.STATE.STOPPED);
|
app.waitForState(Service.STATE.STOPPED);
|
||||||
|
|
||||||
String jobhistoryDir = JobHistoryUtils
|
String jobhistoryDir = JobHistoryUtils
|
||||||
.getHistoryIntermediateDoneDirForUser(conf);
|
.getHistoryIntermediateDoneDirForUser(conf);
|
||||||
JobHistory jobHistory = new JobHistory();
|
JobHistory jobHistory = new JobHistory();
|
||||||
jobHistory.init(conf);
|
jobHistory.init(conf);
|
||||||
|
|
||||||
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
|
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
|
||||||
.getJobIndexInfo();
|
.getJobIndexInfo();
|
||||||
String jobhistoryFileName = FileNameIndexUtils
|
String jobhistoryFileName = FileNameIndexUtils
|
||||||
.getDoneFileName(jobIndexInfo);
|
.getDoneFileName(jobIndexInfo);
|
||||||
|
|
||||||
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
|
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
|
||||||
FSDataInputStream in = null;
|
FSDataInputStream in = null;
|
||||||
FileContext fc = null;
|
FileContext fc = null;
|
||||||
try {
|
try {
|
||||||
fc = FileContext.getFileContext(conf);
|
fc = FileContext.getFileContext(conf);
|
||||||
in = fc.open(fc.makeQualified(historyFilePath));
|
in = fc.open(fc.makeQualified(historyFilePath));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("Can not open history file: " + historyFilePath, ioe);
|
LOG.info("Can not open history file: " + historyFilePath, ioe);
|
||||||
throw (new Exception("Can not open History File"));
|
throw (new Exception("Can not open History File"));
|
||||||
}
|
}
|
||||||
|
|
||||||
JobHistoryParser parser = new JobHistoryParser(in);
|
JobHistoryParser parser = new JobHistoryParser(in);
|
||||||
JobInfo jobInfo = parser.parse();
|
JobInfo jobInfo = parser.parse();
|
||||||
Exception parseException = parser.getParseException();
|
Exception parseException = parser.getParseException();
|
||||||
Assert.assertNull("Caught an expected exception " + parseException,
|
Assert.assertNull("Caught an expected exception " + parseException,
|
||||||
parseException);
|
parseException);
|
||||||
int noOffailedAttempts = 0;
|
int noOffailedAttempts = 0;
|
||||||
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
|
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
|
||||||
for (Task task : job.getTasks().values()) {
|
for (Task task : job.getTasks().values()) {
|
||||||
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
|
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
|
||||||
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
|
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
|
||||||
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
|
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
|
||||||
TypeConverter.fromYarn((taskAttempt.getID())));
|
TypeConverter.fromYarn((taskAttempt.getID())));
|
||||||
// Verify rack-name for all task attempts
|
// Verify rack-name for all task attempts
|
||||||
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
|
Assert.assertEquals("rack-name is incorrect",
|
||||||
.getRackname(), RACK_NAME);
|
taskAttemptInfo.getRackname(), RACK_NAME);
|
||||||
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
|
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
|
||||||
noOffailedAttempts++;
|
noOffailedAttempts++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Assert.assertEquals("No of Failed tasks doesn't match.", 2,
|
||||||
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
|
noOffailedAttempts);
|
||||||
} finally {
|
} finally {
|
||||||
LOG.info("FINISHED testHistoryParsingForFailedAttempts");
|
LOG.info("FINISHED testHistoryParsingForFailedAttempts");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=5000)
|
@Test(timeout = 60000)
|
||||||
public void testCountersForFailedTask() throws Exception {
|
public void testCountersForFailedTask() throws Exception {
|
||||||
LOG.info("STARTING testCountersForFailedTask");
|
LOG.info("STARTING testCountersForFailedTask");
|
||||||
try {
|
try {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf
|
conf.setClass(
|
||||||
.setClass(
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
MyResolver.class, DNSToSwitchMapping.class);
|
RackResolver.init(conf);
|
||||||
RackResolver.init(conf);
|
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
|
||||||
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
|
.getClass().getName(), true);
|
||||||
this.getClass().getName(), true);
|
app.submit(conf);
|
||||||
app.submit(conf);
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
JobId jobId = job.getID();
|
||||||
JobId jobId = job.getID();
|
app.waitForState(job, JobState.FAILED);
|
||||||
app.waitForState(job, JobState.FAILED);
|
|
||||||
|
|
||||||
// make sure all events are flushed
|
// make sure all events are flushed
|
||||||
app.waitForState(Service.STATE.STOPPED);
|
app.waitForState(Service.STATE.STOPPED);
|
||||||
|
|
||||||
String jobhistoryDir = JobHistoryUtils
|
String jobhistoryDir = JobHistoryUtils
|
||||||
.getHistoryIntermediateDoneDirForUser(conf);
|
.getHistoryIntermediateDoneDirForUser(conf);
|
||||||
JobHistory jobHistory = new JobHistory();
|
JobHistory jobHistory = new JobHistory();
|
||||||
jobHistory.init(conf);
|
jobHistory.init(conf);
|
||||||
|
|
||||||
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
|
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
|
||||||
.getJobIndexInfo();
|
.getJobIndexInfo();
|
||||||
String jobhistoryFileName = FileNameIndexUtils
|
String jobhistoryFileName = FileNameIndexUtils
|
||||||
.getDoneFileName(jobIndexInfo);
|
.getDoneFileName(jobIndexInfo);
|
||||||
|
|
||||||
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
|
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
|
||||||
FSDataInputStream in = null;
|
FSDataInputStream in = null;
|
||||||
FileContext fc = null;
|
FileContext fc = null;
|
||||||
try {
|
try {
|
||||||
fc = FileContext.getFileContext(conf);
|
fc = FileContext.getFileContext(conf);
|
||||||
in = fc.open(fc.makeQualified(historyFilePath));
|
in = fc.open(fc.makeQualified(historyFilePath));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("Can not open history file: " + historyFilePath, ioe);
|
LOG.info("Can not open history file: " + historyFilePath, ioe);
|
||||||
throw (new Exception("Can not open History File"));
|
throw (new Exception("Can not open History File"));
|
||||||
}
|
}
|
||||||
|
|
||||||
JobHistoryParser parser = new JobHistoryParser(in);
|
JobHistoryParser parser = new JobHistoryParser(in);
|
||||||
JobInfo jobInfo = parser.parse();
|
JobInfo jobInfo = parser.parse();
|
||||||
Exception parseException = parser.getParseException();
|
Exception parseException = parser.getParseException();
|
||||||
Assert.assertNull("Caught an expected exception " + parseException,
|
Assert.assertNull("Caught an expected exception " + parseException,
|
||||||
parseException);
|
parseException);
|
||||||
for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
|
for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
|
||||||
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
|
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
|
||||||
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
|
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
|
||||||
Assert.assertNotNull("completed task report has null counters",
|
Assert.assertNotNull("completed task report has null counters", ct
|
||||||
ct.getReport().getCounters());
|
.getReport().getCounters());
|
||||||
//Make sure all the completedTask has counters, and the counters are not empty
|
}
|
||||||
Assert.assertTrue(ct.getReport().getCounters()
|
|
||||||
.getAllCounterGroups().size() > 0);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
LOG.info("FINISHED testCountersForFailedTask");
|
LOG.info("FINISHED testCountersForFailedTask");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=50000)
|
@Test(timeout = 50000)
|
||||||
public void testScanningOldDirs() throws Exception {
|
public void testScanningOldDirs() throws Exception {
|
||||||
LOG.info("STARTING testScanningOldDirs");
|
LOG.info("STARTING testScanningOldDirs");
|
||||||
try {
|
try {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf
|
conf.setClass(
|
||||||
.setClass(
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
MyResolver.class, DNSToSwitchMapping.class);
|
RackResolver.init(conf);
|
||||||
RackResolver.init(conf);
|
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
|
||||||
MRApp app =
|
true);
|
||||||
new MRAppWithHistory(1, 1, true,
|
app.submit(conf);
|
||||||
this.getClass().getName(), true);
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
app.submit(conf);
|
JobId jobId = job.getID();
|
||||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
|
||||||
JobId jobId = job.getID();
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
|
|
||||||
app.waitForState(job, JobState.SUCCEEDED);
|
|
||||||
|
|
||||||
// make sure all events are flushed
|
// make sure all events are flushed
|
||||||
app.waitForState(Service.STATE.STOPPED);
|
app.waitForState(Service.STATE.STOPPED);
|
||||||
|
|
||||||
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
|
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
|
||||||
hfm.init(conf);
|
hfm.init(conf);
|
||||||
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
|
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
|
||||||
Assert.assertNotNull("Unable to locate job history", fileInfo);
|
Assert.assertNotNull("Unable to locate job history", fileInfo);
|
||||||
|
|
||||||
// force the manager to "forget" the job
|
// force the manager to "forget" the job
|
||||||
hfm.deleteJobFromJobListCache(fileInfo);
|
hfm.deleteJobFromJobListCache(fileInfo);
|
||||||
final int msecPerSleep = 10;
|
final int msecPerSleep = 10;
|
||||||
int msecToSleep = 10 * 1000;
|
int msecToSleep = 10 * 1000;
|
||||||
while (fileInfo.isMovePending() && msecToSleep > 0) {
|
while (fileInfo.isMovePending() && msecToSleep > 0) {
|
||||||
Assert.assertTrue(!fileInfo.didMoveFail());
|
Assert.assertTrue(!fileInfo.didMoveFail());
|
||||||
msecToSleep -= msecPerSleep;
|
msecToSleep -= msecPerSleep;
|
||||||
Thread.sleep(msecPerSleep);
|
Thread.sleep(msecPerSleep);
|
||||||
}
|
}
|
||||||
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
|
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
|
||||||
|
|
||||||
fileInfo = hfm.getFileInfo(jobId);
|
fileInfo = hfm.getFileInfo(jobId);
|
||||||
Assert.assertNotNull("Unable to locate old job history", fileInfo);
|
Assert.assertNotNull("Unable to locate old job history", fileInfo);
|
||||||
} finally {
|
} finally {
|
||||||
LOG.info("FINISHED testScanningOldDirs");
|
LOG.info("FINISHED testScanningOldDirs");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
|
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
|
||||||
|
|
||||||
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
|
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces,
|
||||||
String testName, boolean cleanOnStart) {
|
boolean autoComplete, String testName, boolean cleanOnStart) {
|
||||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -558,8 +563,8 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
|
|||||||
|
|
||||||
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
|
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
|
||||||
|
|
||||||
public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
|
public MRAppWithHistoryWithFailedTask(int maps, int reduces,
|
||||||
String testName, boolean cleanOnStart) {
|
boolean autoComplete, String testName, boolean cleanOnStart) {
|
||||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,4 +592,133 @@ public static void main(String[] args) throws Exception {
|
|||||||
t.testHistoryParsing();
|
t.testHistoryParsing();
|
||||||
t.testHistoryParsingForFailedAttempts();
|
t.testHistoryParsingForFailedAttempts();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test clean old history files. Files should be deleted after 1 week by
|
||||||
|
* default.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 15000)
|
||||||
|
public void testDeleteFileInfo() throws Exception {
|
||||||
|
LOG.info("STARTING testDeleteFileInfo");
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
conf.setClass(
|
||||||
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
|
|
||||||
|
RackResolver.init(conf);
|
||||||
|
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
|
||||||
|
true);
|
||||||
|
app.submit(conf);
|
||||||
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
|
JobId jobId = job.getID();
|
||||||
|
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
|
||||||
|
// make sure all events are flushed
|
||||||
|
app.waitForState(Service.STATE.STOPPED);
|
||||||
|
|
||||||
|
HistoryFileManager hfm = new HistoryFileManager();
|
||||||
|
hfm.init(conf);
|
||||||
|
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
|
||||||
|
hfm.initExisting();
|
||||||
|
// wait for move files form the done_intermediate directory to the gone
|
||||||
|
// directory
|
||||||
|
while (fileInfo.isMovePending()) {
|
||||||
|
Thread.sleep(300);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertNotNull(hfm.jobListCache.values());
|
||||||
|
|
||||||
|
// try to remove fileInfo
|
||||||
|
hfm.clean();
|
||||||
|
// check that fileInfo does not deleted
|
||||||
|
Assert.assertFalse(fileInfo.isDeleted());
|
||||||
|
// correct live time
|
||||||
|
hfm.setMaxHistoryAge(-1);
|
||||||
|
hfm.clean();
|
||||||
|
// should be deleted !
|
||||||
|
Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
LOG.info("FINISHED testDeleteFileInfo");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple test some methods of JobHistory
|
||||||
|
*/
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testJobHistoryMethods() throws Exception {
|
||||||
|
LOG.info("STARTING testJobHistoryMethods");
|
||||||
|
try {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
configuration
|
||||||
|
.setClass(
|
||||||
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
|
|
||||||
|
RackResolver.init(configuration);
|
||||||
|
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
|
||||||
|
true);
|
||||||
|
app.submit(configuration);
|
||||||
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
|
JobId jobId = job.getID();
|
||||||
|
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
|
||||||
|
JobHistory jobHistory = new JobHistory();
|
||||||
|
jobHistory.init(configuration);
|
||||||
|
// Method getAllJobs
|
||||||
|
Assert.assertEquals(1, jobHistory.getAllJobs().size());
|
||||||
|
// and with ApplicationId
|
||||||
|
Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size());
|
||||||
|
|
||||||
|
JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default",
|
||||||
|
0L, System.currentTimeMillis() + 1, 0L,
|
||||||
|
System.currentTimeMillis() + 1, JobState.SUCCEEDED);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, jobsinfo.getJobs().size());
|
||||||
|
Assert.assertNotNull(jobHistory.getApplicationAttemptId());
|
||||||
|
// test Application Id
|
||||||
|
Assert.assertEquals("application_0_0000", jobHistory.getApplicationID()
|
||||||
|
.toString());
|
||||||
|
Assert
|
||||||
|
.assertEquals("Job History Server", jobHistory.getApplicationName());
|
||||||
|
// method does not work
|
||||||
|
Assert.assertNull(jobHistory.getEventHandler());
|
||||||
|
// method does not work
|
||||||
|
Assert.assertNull(jobHistory.getClock());
|
||||||
|
// method does not work
|
||||||
|
Assert.assertNull(jobHistory.getClusterInfo());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
LOG.info("FINISHED testJobHistoryMethods");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple test PartialJob
|
||||||
|
*/
|
||||||
|
@Test(timeout = 1000)
|
||||||
|
public void testPartialJob() throws Exception {
|
||||||
|
JobId jobId = new JobIdPBImpl();
|
||||||
|
jobId.setId(0);
|
||||||
|
JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
|
||||||
|
"jobName", jobId, 3, 2, "JobStatus");
|
||||||
|
PartialJob test = new PartialJob(jii, jobId);
|
||||||
|
|
||||||
|
Assert.assertEquals(1.0f, test.getProgress(), 0.001f);
|
||||||
|
assertNull(test.getAllCounters());
|
||||||
|
assertNull(test.getTasks());
|
||||||
|
assertNull(test.getTasks(TaskType.MAP));
|
||||||
|
assertNull(test.getTask(new TaskIdPBImpl()));
|
||||||
|
|
||||||
|
assertNull(test.getTaskAttemptCompletionEvents(0, 100));
|
||||||
|
assertNull(test.getMapAttemptCompletionEvents(0, 100));
|
||||||
|
assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
|
||||||
|
assertNull(test.getAMInfos());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,209 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryParsing.MyResolver;
|
||||||
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||||
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/*
|
||||||
|
test JobHistoryServer protocols....
|
||||||
|
*/
|
||||||
|
public class TestJobHistoryServer {
|
||||||
|
private static RecordFactory recordFactory = RecordFactoryProvider
|
||||||
|
.getRecordFactory(null);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
JobHistoryServer historyServer=null;
|
||||||
|
// simple test init/start/stop JobHistoryServer. Status should change.
|
||||||
|
|
||||||
|
@Test (timeout= 50000 )
|
||||||
|
public void testStartStopServer() throws Exception {
|
||||||
|
|
||||||
|
historyServer = new JobHistoryServer();
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
historyServer.init(config);
|
||||||
|
assertEquals(STATE.INITED, historyServer.getServiceState());
|
||||||
|
assertEquals(3, historyServer.getServices().size());
|
||||||
|
historyServer.start();
|
||||||
|
assertEquals(STATE.STARTED, historyServer.getServiceState());
|
||||||
|
historyServer.stop();
|
||||||
|
assertEquals(STATE.STOPPED, historyServer.getServiceState());
|
||||||
|
assertNotNull(historyServer.getClientService());
|
||||||
|
HistoryClientService historyService = historyServer.getClientService();
|
||||||
|
assertNotNull(historyService.getClientHandler().getConnectAddress());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//Test reports of JobHistoryServer. History server should get log files from MRApp and read them
|
||||||
|
|
||||||
|
@Test (timeout= 50000 )
|
||||||
|
public void testReports() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config
|
||||||
|
.setClass(
|
||||||
|
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
|
MyResolver.class, DNSToSwitchMapping.class);
|
||||||
|
|
||||||
|
RackResolver.init(config);
|
||||||
|
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
|
||||||
|
true);
|
||||||
|
app.submit(config);
|
||||||
|
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
|
||||||
|
historyServer = new JobHistoryServer();
|
||||||
|
|
||||||
|
historyServer.init(config);
|
||||||
|
historyServer.start();
|
||||||
|
|
||||||
|
// search JobHistory service
|
||||||
|
JobHistory jobHistory= null;
|
||||||
|
for (Service service : historyServer.getServices() ) {
|
||||||
|
if (service instanceof JobHistory) {
|
||||||
|
jobHistory = (JobHistory) service;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Map<JobId, Job> jobs= jobHistory.getAllJobs();
|
||||||
|
|
||||||
|
assertEquals(1, jobs.size());
|
||||||
|
assertEquals("job_0_0000",jobs.keySet().iterator().next().toString());
|
||||||
|
|
||||||
|
|
||||||
|
Task task = job.getTasks().values().iterator().next();
|
||||||
|
TaskAttempt attempt = task.getAttempts().values().iterator().next();
|
||||||
|
|
||||||
|
HistoryClientService historyService = historyServer.getClientService();
|
||||||
|
MRClientProtocol protocol = historyService.getClientHandler();
|
||||||
|
|
||||||
|
GetTaskAttemptReportRequest gtarRequest = recordFactory
|
||||||
|
.newRecordInstance(GetTaskAttemptReportRequest.class);
|
||||||
|
// test getTaskAttemptReport
|
||||||
|
TaskAttemptId taId = attempt.getID();
|
||||||
|
taId.setTaskId(task.getID());
|
||||||
|
taId.getTaskId().setJobId(job.getID());
|
||||||
|
gtarRequest.setTaskAttemptId(taId);
|
||||||
|
GetTaskAttemptReportResponse response = protocol
|
||||||
|
.getTaskAttemptReport(gtarRequest);
|
||||||
|
assertEquals("container_0_0000_01_000000", response.getTaskAttemptReport()
|
||||||
|
.getContainerId().toString());
|
||||||
|
assertTrue(response.getTaskAttemptReport().getDiagnosticInfo().isEmpty());
|
||||||
|
// counters
|
||||||
|
assertNotNull(response.getTaskAttemptReport().getCounters()
|
||||||
|
.getCounter(TaskCounter.PHYSICAL_MEMORY_BYTES));
|
||||||
|
assertEquals(taId.toString(), response.getTaskAttemptReport()
|
||||||
|
.getTaskAttemptId().toString());
|
||||||
|
// test getTaskReport
|
||||||
|
GetTaskReportRequest request = recordFactory
|
||||||
|
.newRecordInstance(GetTaskReportRequest.class);
|
||||||
|
TaskId taskId = task.getID();
|
||||||
|
taskId.setJobId(job.getID());
|
||||||
|
request.setTaskId(taskId);
|
||||||
|
GetTaskReportResponse reportResponse = protocol.getTaskReport(request);
|
||||||
|
assertEquals("", reportResponse.getTaskReport().getDiagnosticsList()
|
||||||
|
.iterator().next());
|
||||||
|
// progress
|
||||||
|
assertEquals(1.0f, reportResponse.getTaskReport().getProgress(), 0.01);
|
||||||
|
// report has corrected taskId
|
||||||
|
assertEquals(taskId.toString(), reportResponse.getTaskReport().getTaskId()
|
||||||
|
.toString());
|
||||||
|
// Task state should be SUCCEEDED
|
||||||
|
assertEquals(TaskState.SUCCEEDED, reportResponse.getTaskReport()
|
||||||
|
.getTaskState());
|
||||||
|
// test getTaskAttemptCompletionEvents
|
||||||
|
GetTaskAttemptCompletionEventsRequest taskAttemptRequest = recordFactory
|
||||||
|
.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
|
||||||
|
taskAttemptRequest.setJobId(job.getID());
|
||||||
|
GetTaskAttemptCompletionEventsResponse taskAttemptCompletionEventsResponse = protocol
|
||||||
|
.getTaskAttemptCompletionEvents(taskAttemptRequest);
|
||||||
|
assertEquals(0, taskAttemptCompletionEventsResponse.getCompletionEventCount());
|
||||||
|
|
||||||
|
// test getDiagnostics
|
||||||
|
GetDiagnosticsRequest diagnosticRequest = recordFactory
|
||||||
|
.newRecordInstance(GetDiagnosticsRequest.class);
|
||||||
|
diagnosticRequest.setTaskAttemptId(taId);
|
||||||
|
GetDiagnosticsResponse diagnosticResponse = protocol
|
||||||
|
.getDiagnostics(diagnosticRequest);
|
||||||
|
// it is strange : why one empty string ?
|
||||||
|
assertEquals(1, diagnosticResponse.getDiagnosticsCount());
|
||||||
|
assertEquals("", diagnosticResponse.getDiagnostics(0));
|
||||||
|
|
||||||
|
}
|
||||||
|
// test main method
|
||||||
|
@Test (timeout =60000)
|
||||||
|
public void testMainMethod() throws Exception {
|
||||||
|
|
||||||
|
ExitUtil.disableSystemExit();
|
||||||
|
try {
|
||||||
|
JobHistoryServer.main(new String[0]);
|
||||||
|
|
||||||
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
assertEquals(0,e.status);
|
||||||
|
ExitUtil.resetFirstExitException();
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stop(){
|
||||||
|
if(historyServer !=null && !STATE.STOPPED.equals(historyServer.getServiceState())){
|
||||||
|
historyServer.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user