MAPREDUCE-4785. TestMRApp occasionally fails (haibochen via rkanter)
This commit is contained in:
parent
0a9f00af5e
commit
ff0ee84d77
@ -332,6 +332,8 @@ Release 2.9.0 - UNRELEASED
|
|||||||
MAPREDUCE-6620. Jobs that did not start are shown as starting in 1969 in
|
MAPREDUCE-6620. Jobs that did not start are shown as starting in 1969 in
|
||||||
the JHS web UI (haibochen via rkanter)
|
the JHS web UI (haibochen via rkanter)
|
||||||
|
|
||||||
|
MAPREDUCE-4785. TestMRApp occasionally fails (haibochen via rkanter)
|
||||||
|
|
||||||
Release 2.8.0 - UNRELEASED
|
Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -25,7 +25,10 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -205,10 +208,10 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
|
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
|
||||||
// uberization forces full slowstart (1.0), so disable that
|
// uberization forces full slowstart (1.0), so disable that
|
||||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
Job job = app.submit(conf);
|
final Job job1 = app.submit(conf);
|
||||||
app.waitForState(job, JobState.RUNNING);
|
app.waitForState(job1, JobState.RUNNING);
|
||||||
Assert.assertEquals("Num tasks not correct", 4, job.getTasks().size());
|
Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size());
|
||||||
Iterator<Task> it = job.getTasks().values().iterator();
|
Iterator<Task> it = job1.getTasks().values().iterator();
|
||||||
Task mapTask1 = it.next();
|
Task mapTask1 = it.next();
|
||||||
Task mapTask2 = it.next();
|
Task mapTask2 = it.next();
|
||||||
|
|
||||||
@ -240,8 +243,20 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0,
|
final int checkIntervalMillis = 100;
|
||||||
100);
|
final int waitForMillis = 800;
|
||||||
|
|
||||||
|
waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
TaskAttemptCompletionEvent[] events = job1
|
||||||
|
.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
return events.length == 2;
|
||||||
|
}
|
||||||
|
}, checkIntervalMillis, waitForMillis);
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent[] events = job1.getTaskAttemptCompletionEvents
|
||||||
|
(0, 100);
|
||||||
Assert.assertEquals("Expecting 2 completion events for success", 2,
|
Assert.assertEquals("Expecting 2 completion events for success", 2,
|
||||||
events.length);
|
events.length);
|
||||||
|
|
||||||
@ -253,12 +268,21 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
nr.setNodeState(NodeState.UNHEALTHY);
|
nr.setNodeState(NodeState.UNHEALTHY);
|
||||||
updatedNodes.add(nr);
|
updatedNodes.add(nr);
|
||||||
app.getContext().getEventHandler()
|
app.getContext().getEventHandler()
|
||||||
.handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes));
|
.handle(new JobUpdatedNodesEvent(job1.getID(), updatedNodes));
|
||||||
|
|
||||||
app.waitForState(task1Attempt, TaskAttemptState.KILLED);
|
app.waitForState(task1Attempt, TaskAttemptState.KILLED);
|
||||||
app.waitForState(task2Attempt, TaskAttemptState.KILLED);
|
app.waitForState(task2Attempt, TaskAttemptState.KILLED);
|
||||||
|
|
||||||
events = job.getTaskAttemptCompletionEvents(0, 100);
|
waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
TaskAttemptCompletionEvent[] events = job1
|
||||||
|
.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
return events.length == 4;
|
||||||
|
}
|
||||||
|
}, checkIntervalMillis, waitForMillis);
|
||||||
|
|
||||||
|
events = job1.getTaskAttemptCompletionEvents(0, 100);
|
||||||
Assert.assertEquals("Expecting 2 more completion events for killed", 4,
|
Assert.assertEquals("Expecting 2 more completion events for killed", 4,
|
||||||
events.length);
|
events.length);
|
||||||
|
|
||||||
@ -281,7 +305,16 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
app.waitForState(mapTask2, TaskState.RUNNING);
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
events = job.getTaskAttemptCompletionEvents(0, 100);
|
waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
TaskAttemptCompletionEvent[] events = job1
|
||||||
|
.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
return events.length == 5;
|
||||||
|
}
|
||||||
|
}, checkIntervalMillis, waitForMillis);
|
||||||
|
|
||||||
|
events = job1.getTaskAttemptCompletionEvents(0, 100);
|
||||||
Assert.assertEquals("Expecting 1 more completion events for success", 5,
|
Assert.assertEquals("Expecting 1 more completion events for success", 5,
|
||||||
events.length);
|
events.length);
|
||||||
|
|
||||||
@ -295,10 +328,11 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
job = app.submit(conf);
|
|
||||||
app.waitForState(job, JobState.RUNNING);
|
final Job job2 = app.submit(conf);
|
||||||
Assert.assertEquals("No of tasks not correct", 4, job.getTasks().size());
|
app.waitForState(job2, JobState.RUNNING);
|
||||||
it = job.getTasks().values().iterator();
|
Assert.assertEquals("No of tasks not correct", 4, job2.getTasks().size());
|
||||||
|
it = job2.getTasks().values().iterator();
|
||||||
mapTask1 = it.next();
|
mapTask1 = it.next();
|
||||||
mapTask2 = it.next();
|
mapTask2 = it.next();
|
||||||
Task reduceTask1 = it.next();
|
Task reduceTask1 = it.next();
|
||||||
@ -308,7 +342,16 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
app.waitForState(mapTask2, TaskState.RUNNING);
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
events = job.getTaskAttemptCompletionEvents(0, 100);
|
waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
TaskAttemptCompletionEvent[] events = job2
|
||||||
|
.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
return events.length == 2;
|
||||||
|
}
|
||||||
|
}, checkIntervalMillis, waitForMillis);
|
||||||
|
|
||||||
|
events = job2.getTaskAttemptCompletionEvents(0, 100);
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"Expecting 2 completion events for killed & success of map1", 2,
|
"Expecting 2 completion events for killed & success of map1", 2,
|
||||||
events.length);
|
events.length);
|
||||||
@ -321,7 +364,16 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
TaskAttemptEventType.TA_DONE));
|
TaskAttemptEventType.TA_DONE));
|
||||||
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
events = job.getTaskAttemptCompletionEvents(0, 100);
|
waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
TaskAttemptCompletionEvent[] events = job2
|
||||||
|
.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
return events.length == 3;
|
||||||
|
}
|
||||||
|
}, checkIntervalMillis, waitForMillis);
|
||||||
|
|
||||||
|
events = job2.getTaskAttemptCompletionEvents(0, 100);
|
||||||
Assert.assertEquals("Expecting 1 more completion events for success", 3,
|
Assert.assertEquals("Expecting 1 more completion events for success", 3,
|
||||||
events.length);
|
events.length);
|
||||||
|
|
||||||
@ -352,12 +404,28 @@ public void testUpdatedNodes() throws Exception {
|
|||||||
TaskAttemptEventType.TA_DONE));
|
TaskAttemptEventType.TA_DONE));
|
||||||
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
|
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
events = job.getTaskAttemptCompletionEvents(0, 100);
|
waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
TaskAttemptCompletionEvent[] events = job2
|
||||||
|
.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
return events.length == 5;
|
||||||
|
}
|
||||||
|
}, checkIntervalMillis, waitForMillis);
|
||||||
|
events = job2.getTaskAttemptCompletionEvents(0, 100);
|
||||||
Assert.assertEquals("Expecting 2 more completion events for reduce success",
|
Assert.assertEquals("Expecting 2 more completion events for reduce success",
|
||||||
5, events.length);
|
5, events.length);
|
||||||
|
|
||||||
// job succeeds
|
// job succeeds
|
||||||
app.waitForState(job, JobState.SUCCEEDED);
|
app.waitForState(job2, JobState.SUCCEEDED);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void waitFor(Supplier<Boolean> predicate, int
|
||||||
|
checkIntervalMillis, int checkTotalMillis) throws InterruptedException {
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(predicate, checkIntervalMillis, checkTotalMillis);
|
||||||
|
} catch (TimeoutException ex) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user