diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b9fe77109a..c13fa4fa10 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -193,6 +193,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv) + MAPREDUCE-3569. TaskAttemptListener holds a global lock for all + task-updates. (Vinod Kumar Vavilapalli via sseth) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index ba0068098e..0e6e3eed04 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -19,14 +19,12 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,21 +62,22 @@ * This class HAS to be in this package to access package private * methods/classes. */ +@SuppressWarnings({"unchecked" , "deprecation"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { + private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true); + private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class); private AppContext context; private Server server; protected TaskHeartbeatHandler taskHeartbeatHandler; private InetSocketAddress address; - private Map jvmIDToActiveAttemptMap = - Collections.synchronizedMap(new HashMap()); + private ConcurrentMap + jvmIDToActiveAttemptMap + = new ConcurrentHashMap(); private JobTokenSecretManager jobTokenSecretManager = null; - private Set pendingJvms = - Collections.synchronizedSet(new HashSet()); public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager) { @@ -123,10 +122,9 @@ protected void startRpcServer() { server.start(); InetSocketAddress listenerAddress = server.getListenerAddress(); - this.address = - NetUtils.createSocketAddr(listenerAddress.getAddress() - .getLocalHost().getCanonicalHostName() - + ":" + listenerAddress.getPort()); + listenerAddress.getAddress(); + this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost() + .getCanonicalHostName() + ":" + listenerAddress.getPort()); } catch (IOException e) { throw new YarnException(e); } @@ -408,57 +406,59 @@ public JvmTask getTask(JvmContext context) throws IOException { WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, jvmId.getId()); - synchronized(this) { - if(pendingJvms.contains(wJvmID)) { - org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID); - if (task != null) { //there may be lag in the attempt getting added here - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - jvmTask = new JvmTask(task, false); - //remove the task as it is no more needed and free up the memory - //Also we have already told the JVM to process a task, so it is no - //longer pending, and further request should ask it to exit. - pendingJvms.remove(wJvmID); - jvmIDToActiveAttemptMap.remove(wJvmID); - } - } else { - LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); - jvmTask = new JvmTask(null, true); - } + // Try to look up the task. We remove it directly as we don't give + // multiple tasks to a JVM + org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap + .remove(wJvmID); + if (task != null) { + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + + // remove the task as it is no more needed and free up the memory + // Also we have already told the JVM to process a task, so it is no + // longer pending, and further request should ask it to exit. + } else { + LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); + jvmTask = TASK_FOR_INVALID_JVM; } return jvmTask; } @Override - public synchronized void registerPendingTask(WrappedJvmID jvmID) { - //Save this JVM away as one that has not been handled yet - pendingJvms.add(jvmID); + public void registerPendingTask( + org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { + // Create the mapping so that it is easy to look up + // when the jvm comes back to ask for Task. + + // A JVM not present in this map is an illegal task/JVM. + jvmIDToActiveAttemptMap.put(jvmID, task); } @Override public void registerLaunchedTask( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { - synchronized(this) { - //create the mapping so that it is easy to look up - //when it comes back to ask for Task. - jvmIDToActiveAttemptMap.put(jvmID, task); - //This should not need to happen here, but just to be on the safe side - if(!pendingJvms.add(jvmID)) { - LOG.warn(jvmID+" launched without first being registered"); - } - } - //register this attempt + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) { + + // The task is launched. Register this for expiry-tracking. + + // Timing can cause this to happen after the real JVM launches and gets a + // task which is still fine as we will only be tracking for expiry a little + // late than usual. taskHeartbeatHandler.register(attemptID); } @Override - public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + public void unregister( + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, WrappedJvmID jvmID) { - //remove the mapping if not already removed + + // Unregistration also comes from the same TaskAttempt which does the + // registration. Events are ordered at TaskAttempt, so unregistration will + // always come after registration. + + // remove the mapping if not already removed jvmIDToActiveAttemptMap.remove(jvmID); - //remove the pending if not already removed - pendingJvms.remove(jvmID); + //unregister this attempt taskHeartbeatHandler.unregister(attemptID); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java index b5e5cd37b2..7002e69d52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java @@ -32,20 +32,21 @@ public interface TaskAttemptListener { InetSocketAddress getAddress(); /** - * register a JVM with the listener. This should be called as soon as a + * Register a JVM with the listener. This should be called as soon as a * JVM ID is assigned to a task attempt, before it has been launched. + * @param task the task itself for this JVM. * @param jvmID The ID of the JVM . */ - void registerPendingTask(WrappedJvmID jvmID); + void registerPendingTask(Task task, WrappedJvmID jvmID); /** - * Register the task and task attempt with the JVM. This should be called - * when the JVM has been launched. - * @param attemptID the id of the attempt for this JVM. - * @param task the task itself for this JVM. - * @param jvmID the id of the JVM handling the task. + * Register task attempt. This should be called when the JVM has been + * launched. + * + * @param attemptID + * the id of the attempt for this JVM. */ - void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); + void registerLaunchedTask(TaskAttemptId attemptID); /** * Unregister the JVM and the attempt associated with it. This should be diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index e8689d3346..4655895dbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1109,7 +1109,8 @@ public void transition(final TaskAttemptImpl taskAttempt, taskAttempt.jvmID = new WrappedJvmID( taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); - taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID); + taskAttempt.taskAttemptListener.registerPendingTask( + taskAttempt.remoteTask, taskAttempt.jvmID); //launch the container //create the container object to be launched for a given Task attempt @@ -1198,10 +1199,9 @@ public void transition(TaskAttemptImpl taskAttempt, taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); - // register it to TaskAttemptListener so that it start listening - // for it - taskAttempt.taskAttemptListener.registerLaunchedTask( - taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); + // register it to TaskAttemptListener so that it can start monitoring it. + taskAttempt.taskAttemptListener + .registerLaunchedTask(taskAttempt.attemptId); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index a5756da993..f26091ac64 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.IOException; @@ -68,33 +71,47 @@ public void testGetTask() throws IOException { JVMId id = new JVMId("foo",1, true, 1); WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - - //Now register the JVM, and see - listener.registerPendingTask(wid); - result = listener.getTask(context); - assertNull(result); - + + // Verify ask after registration but before launch TaskAttemptId attemptID = mock(TaskAttemptId.class); Task task = mock(Task.class); //Now put a task with the ID - listener.registerLaunchedTask(attemptID, task, wid); + listener.registerPendingTask(task, wid); + result = listener.getTask(context); + assertNotNull(result); + assertFalse(result.shouldDie); + // Unregister for more testing. + listener.unregister(attemptID, wid); + + // Verify ask after registration and launch + //Now put a task with the ID + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptID); verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); - + // Don't unregister yet for more testing. + //Verify that if we call it again a second time we are told to die. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - + listener.unregister(attemptID, wid); + + // Verify after unregistration. + result = listener.getTask(context); + assertNotNull(result); + assertTrue(result.shouldDie); + listener.stop(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index b4b9602b81..f17bf6f8af 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -91,6 +91,7 @@ * Mock MRAppMaster. Doesn't start RPC servers. * No threads are started except of the event Dispatcher thread. */ +@SuppressWarnings("unchecked") public class MRApp extends MRAppMaster { private static final Log LOG = LogFactory.getLog(MRApp.class); @@ -323,13 +324,13 @@ public InetSocketAddress getAddress() { return NetUtils.createSocketAddr("localhost:54321"); } @Override - public void registerLaunchedTask(TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {} + public void registerLaunchedTask(TaskAttemptId attemptID) {} @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } @Override - public void registerPendingTask(WrappedJvmID jvmID) { + public void registerPendingTask(org.apache.hadoop.mapred.Task task, + WrappedJvmID jvmID) { } }; } @@ -357,7 +358,6 @@ protected class MockContainerLauncher implements ContainerLauncher { public MockContainerLauncher() { } - @SuppressWarnings("unchecked") @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) {