diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 9b6148c63d..67f8ff03b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory; * This class HAS to be in this package to access package private * methods/classes. */ -@SuppressWarnings({"unchecked"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { @@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap(); + + private ConcurrentMap> attemptIdToStatus + = new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap()); @@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + AtomicReference lastStatusRef = + attemptIdToStatus.get(yarnAttemptID); + if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); + } + AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); @@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService // // isn't ever changed by the Task itself. // taskStatus.getIncludeCounters(); - context.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, - taskAttemptStatus)); + coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef); + return feedback; } @@ -520,6 +535,8 @@ public class TaskAttemptListenerImpl extends CompositeService launchedJVMs.add(jvmId); taskHeartbeatHandler.register(attemptID); + + attemptIdToStatus.put(attemptID, new AtomicReference<>()); } @Override @@ -541,6 +558,8 @@ public class TaskAttemptListenerImpl extends CompositeService //unregister this attempt taskHeartbeatHandler.unregister(attemptID); + + attemptIdToStatus.remove(attemptID); } @Override @@ -563,4 +582,46 @@ public class TaskAttemptListenerImpl extends CompositeService preemptionPolicy.setCheckpointID(tid, cid); } + private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID, + TaskAttemptStatus taskAttemptStatus, + AtomicReference lastStatusRef) { + boolean asyncUpdatedNeeded = false; + TaskAttemptStatus lastStatus = lastStatusRef.get(); + + if (lastStatus == null) { + lastStatusRef.set(taskAttemptStatus); + asyncUpdatedNeeded = true; + } else { + List oldFetchFailedMaps = + taskAttemptStatus.fetchFailedMaps; + + // merge fetchFailedMaps from the previous update + if (lastStatus.fetchFailedMaps != null) { + if (taskAttemptStatus.fetchFailedMaps == null) { + taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps; + } else { + taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps); + } + } + + if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) { + // update failed - async dispatcher has processed it in the meantime + taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps; + lastStatusRef.set(taskAttemptStatus); + asyncUpdatedNeeded = true; + } + } + + if (asyncUpdatedNeeded) { + context.getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, + lastStatusRef)); + } + } + + @VisibleForTesting + ConcurrentMap> getAttemptIdToStatus() { + return attemptIdToStatus; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java index 715f63d1be..cef4fd0509 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.event; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; @@ -26,17 +27,16 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { - - private TaskAttemptStatus reportedTaskAttemptStatus; + private AtomicReference taskAttemptStatusRef; public TaskAttemptStatusUpdateEvent(TaskAttemptId id, - TaskAttemptStatus taskAttemptStatus) { + AtomicReference taskAttemptStatusRef) { super(id, TaskAttemptEventType.TA_UPDATE); - this.reportedTaskAttemptStatus = taskAttemptStatus; + this.taskAttemptStatusRef = taskAttemptStatusRef; } - public TaskAttemptStatus getReportedTaskAttemptStatus() { - return reportedTaskAttemptStatus; + public AtomicReference getTaskAttemptStatusRef() { + return taskAttemptStatusRef; } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 90e0d2181a..431128be71 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1780,7 +1781,6 @@ public abstract class TaskAttemptImpl implements taskAttempt.updateProgressSplits(); } - static class RequestContainerTransition implements SingleArcTransition { private final boolean rescheduled; @@ -1965,6 +1965,7 @@ public abstract class TaskAttemptImpl implements // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); + //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); @@ -2430,15 +2431,20 @@ public abstract class TaskAttemptImpl implements } private static class StatusUpdater - implements SingleArcTransition { + implements SingleArcTransition { @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { - // Status update calls don't really change the state of the attempt. + TaskAttemptStatusUpdateEvent statusEvent = + ((TaskAttemptStatusUpdateEvent)event); + + AtomicReference taskAttemptStatusRef = + statusEvent.getTaskAttemptStatusRef(); + TaskAttemptStatus newReportedStatus = - ((TaskAttemptStatusUpdateEvent) event) - .getReportedTaskAttemptStatus(); + taskAttemptStatusRef.getAndSet(null); + // Now switch the information in the reportedStatus taskAttempt.reportedStatus = newReportedStatus; taskAttempt.reportedStatus.taskState = taskAttempt.getState(); @@ -2447,12 +2453,10 @@ public abstract class TaskAttemptImpl implements taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); - taskAttempt.updateProgressSplits(); - //if fetch failures are present, send the fetch failure event to job //this only will happen in reduce attempt type - if (taskAttempt.reportedStatus.fetchFailedMaps != null && + if (taskAttempt.reportedStatus.fetchFailedMaps != null && taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { String hostname = taskAttempt.container == null ? "UNKNOWN" : taskAttempt.container.getNodeId().getHost(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index fa8418a073..4ff6fb2aa5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -35,6 +37,7 @@ import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -42,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; @@ -52,12 +57,69 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.SystemClock; - +import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + import static org.junit.Assert.*; import static org.mockito.Mockito.*; +/** + * Tests the behavior of TaskAttemptListenerImpl. + */ +@RunWith(MockitoJUnitRunner.class) public class TestTaskAttemptListenerImpl { + private static final String ATTEMPT1_ID = + "attempt_123456789012_0001_m_000001_0"; + private static final String ATTEMPT2_ID = + "attempt_123456789012_0001_m_000002_0"; + + private static final TaskAttemptId TASKATTEMPTID1 = + TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID)); + private static final TaskAttemptId TASKATTEMPTID2 = + TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID)); + + @Mock + private AppContext appCtx; + + @Mock + private JobTokenSecretManager secret; + + @Mock + private RMHeartbeatHandler rmHeartbeatHandler; + + @Mock + private TaskHeartbeatHandler hbHandler; + + @Mock + private Dispatcher dispatcher; + + @Mock + private Task task; + + @SuppressWarnings("rawtypes") + @Mock + private EventHandler ea; + + @SuppressWarnings("rawtypes") + @Captor + private ArgumentCaptor eventCaptor; + + private CheckpointAMPreemptionPolicy policy; + private JVMId id; + private WrappedJvmID wid; + private TaskAttemptID attemptID; + private TaskAttemptId attemptId; + private ReduceTaskStatus firstReduceStatus; + private ReduceTaskStatus secondReduceStatus; + private ReduceTaskStatus thirdReduceStatus; + + private MockTaskAttemptListenerImpl listener; + public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl { @@ -93,34 +155,24 @@ public class TestTaskAttemptListenerImpl { //Empty } } - + + @After + public void after() throws IOException { + if (listener != null) { + listener.close(); + listener = null; + } + } + @Test (timeout=5000) public void testGetTask() throws IOException { - AppContext appCtx = mock(AppContext.class); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - Dispatcher dispatcher = mock(Dispatcher.class); - @SuppressWarnings("unchecked") - EventHandler ea = mock(EventHandler.class); - when(dispatcher.getEventHandler()).thenReturn(ea); - - when(appCtx.getEventHandler()).thenReturn(ea); - CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); - policy.init(appCtx); - MockTaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, - rmHeartbeatHandler, hbHandler, policy); - Configuration conf = new Configuration(); - listener.init(conf); - listener.start(); - JVMId id = new JVMId("foo",1, true, 1); - WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + configureMocks(); + startListener(false); // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); + context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); @@ -128,20 +180,18 @@ public class TestTaskAttemptListenerImpl { // Verify ask after registration but before launch. // Don't kill, should be null. - TaskAttemptId attemptID = mock(TaskAttemptId.class); - Task task = mock(Task.class); //Now put a task with the ID listener.registerPendingTask(task, wid); result = listener.getTask(context); assertNull(result); // Unregister for more testing. - listener.unregister(attemptID, wid); + listener.unregister(attemptId, wid); // Verify ask after registration and launch //Now put a task with the ID listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptID, wid); - verify(hbHandler).register(attemptID); + listener.registerLaunchedTask(attemptId, wid); + verify(hbHandler).register(attemptId); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); @@ -152,15 +202,13 @@ public class TestTaskAttemptListenerImpl { assertNotNull(result); assertTrue(result.shouldDie); - listener.unregister(attemptID, wid); + listener.unregister(attemptId, wid); // Verify after unregistration. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - listener.stop(); - // test JVMID JVMId jvmid = JVMId.forName("jvm_001_002_m_004"); assertNotNull(jvmid); @@ -206,20 +254,10 @@ public class TestTaskAttemptListenerImpl { when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn( TypeConverter.fromYarn(empty)); - AppContext appCtx = mock(AppContext.class); + configureMocks(); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - Dispatcher dispatcher = mock(Dispatcher.class); - @SuppressWarnings("unchecked") - EventHandler ea = mock(EventHandler.class); - when(dispatcher.getEventHandler()).thenReturn(ea); - when(appCtx.getEventHandler()).thenReturn(ea); - CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); - policy.init(appCtx); - TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl( + + listener = new MockTaskAttemptListenerImpl( appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { @@ -262,26 +300,17 @@ public class TestTaskAttemptListenerImpl { public void testCommitWindow() throws IOException { SystemClock clock = SystemClock.getInstance(); + configureMocks(); + org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); Job mockJob = mock(Job.class); when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); - AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getClock()).thenReturn(clock); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - Dispatcher dispatcher = mock(Dispatcher.class); - @SuppressWarnings("unchecked") - EventHandler ea = mock(EventHandler.class); - when(dispatcher.getEventHandler()).thenReturn(ea); - when(appCtx.getEventHandler()).thenReturn(ea); - CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); - policy.init(appCtx); - TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl( + + listener = new MockTaskAttemptListenerImpl( appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { @@ -300,44 +329,29 @@ public class TestTaskAttemptListenerImpl { verify(mockTask, never()).canCommit(any(TaskAttemptId.class)); // verify commit allowed when RM heartbeat is recent - when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime()); + when(rmHeartbeatHandler.getLastHeartbeatTime()) + .thenReturn(clock.getTime()); canCommit = listener.canCommit(tid); assertTrue(canCommit); verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class)); - - listener.stop(); } @Test public void testCheckpointIDTracking() throws IOException, InterruptedException{ - SystemClock clock = SystemClock.getInstance(); + configureMocks(); + org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); Job mockJob = mock(Job.class); when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); - - Dispatcher dispatcher = mock(Dispatcher.class); - @SuppressWarnings("unchecked") - EventHandler ea = mock(EventHandler.class); - when(dispatcher.getEventHandler()).thenReturn(ea); - - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - - AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getClock()).thenReturn(clock); - when(appCtx.getEventHandler()).thenReturn(ea); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - when(appCtx.getEventHandler()).thenReturn(ea); - CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); - policy.init(appCtx); - TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl( + + listener = new MockTaskAttemptListenerImpl( appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { @@ -387,42 +401,13 @@ public class TestTaskAttemptListenerImpl { //assert it worked assert outcid == incid; - - listener.stop(); - } - @SuppressWarnings("rawtypes") @Test public void testStatusUpdateProgress() throws IOException, InterruptedException { - AppContext appCtx = mock(AppContext.class); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - Dispatcher dispatcher = mock(Dispatcher.class); - @SuppressWarnings("unchecked") - EventHandler ea = mock(EventHandler.class); - when(dispatcher.getEventHandler()).thenReturn(ea); - - when(appCtx.getEventHandler()).thenReturn(ea); - CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); - policy.init(appCtx); - MockTaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, - rmHeartbeatHandler, hbHandler, policy); - Configuration conf = new Configuration(); - listener.init(conf); - listener.start(); - JVMId id = new JVMId("foo",1, true, 1); - WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); - - TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1); - TaskAttemptId attemptId = TypeConverter.toYarn(attemptID); - Task task = mock(Task.class); - listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptId, wid); + configureMocks(); + startListener(true); verify(hbHandler).register(attemptId); // make sure a ping doesn't report progress @@ -437,6 +422,116 @@ public class TestTaskAttemptListenerImpl { feedback = listener.statusUpdate(attemptID, mockStatus); assertTrue(feedback.getTaskFound()); verify(hbHandler).progressing(eq(attemptId)); - listener.close(); + } + + @Test + public void testSingleStatusUpdate() + throws IOException, InterruptedException { + configureMocks(); + startListener(true); + + listener.statusUpdate(attemptID, firstReduceStatus); + + verify(ea).handle(eventCaptor.capture()); + TaskAttemptStatusUpdateEvent updateEvent = + (TaskAttemptStatusUpdateEvent) eventCaptor.getValue(); + + TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get(); + assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1)); + assertEquals(1, status.fetchFailedMaps.size()); + assertEquals(Phase.SHUFFLE, status.phase); + } + + @Test + public void testStatusUpdateEventCoalescing() + throws IOException, InterruptedException { + configureMocks(); + startListener(true); + + listener.statusUpdate(attemptID, firstReduceStatus); + listener.statusUpdate(attemptID, secondReduceStatus); + + verify(ea).handle(any(Event.class)); + ConcurrentMap> attemptIdToStatus = + listener.getAttemptIdToStatus(); + TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get(); + + assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1)); + assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2)); + assertEquals(2, status.fetchFailedMaps.size()); + assertEquals(Phase.SORT, status.phase); + } + + @Test + public void testCoalescedStatusUpdatesCleared() + throws IOException, InterruptedException { + // First two events are coalesced, the third is not + configureMocks(); + startListener(true); + + listener.statusUpdate(attemptID, firstReduceStatus); + listener.statusUpdate(attemptID, secondReduceStatus); + ConcurrentMap> attemptIdToStatus = + listener.getAttemptIdToStatus(); + attemptIdToStatus.get(attemptId).set(null); + listener.statusUpdate(attemptID, thirdReduceStatus); + + verify(ea, times(2)).handle(eventCaptor.capture()); + TaskAttemptStatusUpdateEvent updateEvent = + (TaskAttemptStatusUpdateEvent) eventCaptor.getValue(); + + TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get(); + assertNull(status.fetchFailedMaps); + assertEquals(Phase.REDUCE, status.phase); + } + + @Test(expected = IllegalStateException.class) + public void testStatusUpdateFromUnregisteredTask() + throws IOException, InterruptedException{ + configureMocks(); + startListener(false); + + listener.statusUpdate(attemptID, firstReduceStatus); + } + + private void configureMocks() { + firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE, + new Counters()); + firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID)); + + secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT, + new Counters()); + secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID)); + + thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", + TaskStatus.Phase.REDUCE, new Counters()); + + when(dispatcher.getEventHandler()).thenReturn(ea); + when(appCtx.getEventHandler()).thenReturn(ea); + policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + listener = new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler, policy); + id = new JVMId("foo", 1, true, 1); + wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1); + attemptId = TypeConverter.toYarn(attemptID); + } + + private void startListener(boolean registerTask) { + Configuration conf = new Configuration(); + + listener.init(conf); + listener.start(); + + if (registerTask) { + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptId, wid); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java index cb2a29e390..67a89014ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; @@ -442,7 +443,7 @@ public class TestFetchFailure { status.stateString = "OK"; status.taskState = attempt.getState(); TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(), - status); + new AtomicReference<>(status)); app.getContext().getEventHandler().handle(event); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java index 77f9a092a1..ca3c28cbaf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -103,7 +104,8 @@ public class TestMRClientService { taskAttemptStatus.phase = Phase.MAP; // send the status update app.getContext().getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus)); + new TaskAttemptStatusUpdateEvent(attempt.getID(), + new AtomicReference<>(taskAttemptStatus))); //verify that all object are fully populated by invoking RPCs. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index e8003c0be7..de171c7512 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -84,7 +85,8 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } } @@ -155,7 +157,8 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } } @@ -180,7 +183,8 @@ public class TestSpeculativeExecutionWithMRApp { TaskAttemptState.RUNNING); speculatedTask = task.getValue(); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } } @@ -195,7 +199,8 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } }