MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a task-scheduling deadlock. Contributed by Robert Joseph Evans.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-30 11:29:41 +00:00
parent 64c019cccc
commit 1c8d64f38a
6 changed files with 180 additions and 25 deletions

View File

@ -1862,6 +1862,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3256. Added authorization checks for the protocol between MAPREDUCE-3256. Added authorization checks for the protocol between
NodeManager and ApplicationMaster. (vinodkv via acmurthy) NodeManager and ApplicationMaster. (vinodkv via acmurthy)
MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a
task-scheduling deadlock. (Robert Joseph Evans via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,8 +23,10 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -69,12 +71,14 @@ public class TaskAttemptListenerImpl extends CompositeService
private AppContext context; private AppContext context;
private Server server; private Server server;
private TaskHeartbeatHandler taskHeartbeatHandler; protected TaskHeartbeatHandler taskHeartbeatHandler;
private InetSocketAddress address; private InetSocketAddress address;
private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap = private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap =
Collections.synchronizedMap(new HashMap<WrappedJvmID, Collections.synchronizedMap(new HashMap<WrappedJvmID,
org.apache.hadoop.mapred.Task>()); org.apache.hadoop.mapred.Task>());
private JobTokenSecretManager jobTokenSecretManager = null; private JobTokenSecretManager jobTokenSecretManager = null;
private Set<WrappedJvmID> pendingJvms =
Collections.synchronizedSet(new HashSet<WrappedJvmID>());
public TaskAttemptListenerImpl(AppContext context, public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager) { JobTokenSecretManager jobTokenSecretManager) {
@ -395,35 +399,55 @@ public JvmTask getTask(JvmContext context) throws IOException {
JVMId jvmId = context.jvmId; JVMId jvmId = context.jvmId;
LOG.info("JVM with ID : " + jvmId + " asked for a task"); LOG.info("JVM with ID : " + jvmId + " asked for a task");
// TODO: Is it an authorised container to get a task? Otherwise return null. JvmTask jvmTask = null;
// TODO: Is it an authorized container to get a task? Otherwise return null.
// TODO: Is the request for task-launch still valid?
// TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
// to jobId and task-type. // to jobId and task-type.
WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
jvmId.getId()); jvmId.getId());
org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID); synchronized(this) {
if (task != null) { //there may be lag in the attempt getting added here if(pendingJvms.contains(wJvmID)) {
LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
JvmTask jvmTask = new JvmTask(task, false); if (task != null) { //there may be lag in the attempt getting added here
LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
//remove the task as it is no more needed and free up the memory jvmTask = new JvmTask(task, false);
jvmIDToAttemptMap.remove(wJvmID);
//remove the task as it is no more needed and free up the memory
return jvmTask; //Also we have already told the JVM to process a task, so it is no
//longer pending, and further request should ask it to exit.
pendingJvms.remove(wJvmID);
jvmIDToActiveAttemptMap.remove(wJvmID);
}
} else {
LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
jvmTask = new JvmTask(null, true);
}
} }
return null; return jvmTask;
}
@Override
public synchronized void registerPendingTask(WrappedJvmID jvmID) {
//Save this JVM away as one that has not been handled yet
pendingJvms.add(jvmID);
} }
@Override @Override
public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, public void registerLaunchedTask(
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
//create the mapping so that it is easy to look up synchronized(this) {
//when it comes back to ask for Task. //create the mapping so that it is easy to look up
jvmIDToAttemptMap.put(jvmID, task); //when it comes back to ask for Task.
jvmIDToActiveAttemptMap.put(jvmID, task);
//This should not need to happen here, but just to be on the safe side
if(!pendingJvms.add(jvmID)) {
LOG.warn(jvmID+" launched without first being registered");
}
}
//register this attempt //register this attempt
taskHeartbeatHandler.register(attemptID); taskHeartbeatHandler.register(attemptID);
} }
@ -432,8 +456,9 @@ public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId at
public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
WrappedJvmID jvmID) { WrappedJvmID jvmID) {
//remove the mapping if not already removed //remove the mapping if not already removed
jvmIDToAttemptMap.remove(jvmID); jvmIDToActiveAttemptMap.remove(jvmID);
//remove the pending if not already removed
pendingJvms.remove(jvmID);
//unregister this attempt //unregister this attempt
taskHeartbeatHandler.unregister(attemptID); taskHeartbeatHandler.unregister(attemptID);
} }

View File

@ -24,12 +24,35 @@
import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
/**
* This class listens for changes to the state of a Task.
*/
public interface TaskAttemptListener { public interface TaskAttemptListener {
InetSocketAddress getAddress(); InetSocketAddress getAddress();
void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); /**
* register a JVM with the listener. This should be called as soon as a
* JVM ID is assigned to a task attempt, before it has been launched.
* @param jvmID The ID of the JVM .
*/
void registerPendingTask(WrappedJvmID jvmID);
/**
* Register the task and task attempt with the JVM. This should be called
* when the JVM has been launched.
* @param attemptID the id of the attempt for this JVM.
* @param task the task itself for this JVM.
* @param jvmID the id of the JVM handling the task.
*/
void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
/**
* Unregister the JVM and the attempt associated with it. This should be
* called when the attempt/JVM has finished executing and is being cleaned up.
* @param attemptID the ID of the attempt.
* @param jvmID the ID of the JVM for that attempt.
*/
void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID); void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
} }

View File

@ -1012,6 +1012,7 @@ public void transition(final TaskAttemptImpl taskAttempt,
taskAttempt.jvmID = new WrappedJvmID( taskAttempt.jvmID = new WrappedJvmID(
taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.getTaskID().getJobID(),
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
//launch the container //launch the container
//create the container object to be launched for a given Task attempt //create the container object to be launched for a given Task attempt
@ -1106,7 +1107,7 @@ public void transition(TaskAttemptImpl taskAttempt,
// register it to TaskAttemptListener so that it start listening // register it to TaskAttemptListener so that it start listening
// for it // for it
taskAttempt.taskAttemptListener.register( taskAttempt.taskAttemptListener.registerLaunchedTask(
taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address. //TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = InetSocketAddress nodeHttpInetAddr =

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.junit.Test;
public class TestTaskAttemptListenerImpl {
public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
TaskHeartbeatHandler hbHandler) {
super(context, jobTokenSecretManager);
this.taskHeartbeatHandler = hbHandler;
}
@Override
protected void registerHeartbeatHandler() {
//Empty
}
@Override
protected void startRpcServer() {
//Empty
}
@Override
protected void stopRpcServer() {
//Empty
}
}
@Test
public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
JVMId id = new JVMId("foo",1, true, 1);
WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
//The JVM ID has not been registered yet so we should kill it.
JvmContext context = new JvmContext();
context.jvmId = id;
JvmTask result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
//Now register the JVM, and see
listener.registerPendingTask(wid);
result = listener.getTask(context);
assertNull(result);
TaskAttemptId attemptID = mock(TaskAttemptId.class);
Task task = mock(Task.class);
//Now put a task with the ID
listener.registerLaunchedTask(attemptID, task, wid);
verify(hbHandler).register(attemptID);
result = listener.getTask(context);
assertNotNull(result);
assertFalse(result.shouldDie);
//Verify that if we call it again a second time we are told to die.
result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
listener.unregister(attemptID, wid);
listener.stop();
}
}

View File

@ -294,11 +294,14 @@ public InetSocketAddress getAddress() {
return null; return null;
} }
@Override @Override
public void register(TaskAttemptId attemptID, public void registerLaunchedTask(TaskAttemptId attemptID,
org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {} org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
@Override @Override
public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
} }
@Override
public void registerPendingTask(WrappedJvmID jvmID) {
}
}; };
} }