MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1309037 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-04-03 16:59:26 +00:00
parent 00eb28d927
commit 2b5c36b695
5 changed files with 127 additions and 161 deletions

View File

@ -226,6 +226,8 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4012 Hadoop Job setup error leaves no useful info to users
(when LinuxTaskController is used). (tgraves)
MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -30,11 +30,4 @@ enum EventType {
CONTAINER_REMOTE_CLEANUP
}
// Not a documented config. Only used for tests
static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
+ "nm-command-timeout";
/**
* Maximum of 1 minute timeout for a Node to react to the command
*/
static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000;
}

View File

@ -23,8 +23,6 @@
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@ -72,8 +70,6 @@ public class ContainerLauncherImpl extends AbstractService implements
static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
int nmTimeOut;
private ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
private AppContext context;
@ -83,7 +79,6 @@ public class ContainerLauncherImpl extends AbstractService implements
private Thread eventHandlingThread;
protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
final Timer commandTimer = new Timer(true);
YarnRPC rpc;
private Container getContainer(ContainerId id) {
@ -130,30 +125,18 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
"Container was killed before it was launched");
return;
}
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
ContainerManager proxy = null;
try {
commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
// Interrupted during getProxy, but that didn't throw exception
if (Thread.interrupted()) {
// The timer canceled the command in the mean while.
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
}
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
event.getContainer();
@ -164,19 +147,6 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);
// container started properly. Stop the timer
timerTask.cancel();
if (Thread.interrupted()) {
// The timer canceled the command in the mean while, but
// startContainer didn't throw exception
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
}
ByteBuffer portInfo = response
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
@ -198,17 +168,11 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer canceled the command in the mean while.
LOG.info("Start-container for " + event.getContainerID()
+ " got interrupted.");
}
String message = "Container launch failed for " + containerID + " : "
+ StringUtils.stringifyException(t);
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
timerTask.cancel();
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
@ -220,41 +184,24 @@ public synchronized void kill(ContainerLauncherEvent event) {
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("KILLING " + taskAttemptID);
commandTimer.schedule(timerTask, nmTimeOut);
ContainerManager proxy = null;
try {
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
if (Thread.interrupted()) {
// The timer canceled the command in the mean while. No need to
// return, send cleaned up event anyways.
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
} else {
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID());
proxy.stopContainer(stopRequest);
}
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer canceled the command in the mean while, clear the
// interrupt flag
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
}
} catch (Throwable t) {
// ignore the cleanup failure
String message = "cleanup failed for container "
@ -264,15 +211,6 @@ public synchronized void kill(ContainerLauncherEvent event) {
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
LOG.warn(message);
} finally {
timerTask.cancel();
if (Thread.interrupted()) {
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
// ignore the cleanup failure
context.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
"cleanup failed for container " + event.getContainerID()));
}
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
@ -303,8 +241,6 @@ public synchronized void init(Configuration config) {
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
this.rpc = createYarnRPC(conf);
super.init(conf);
}
@ -409,44 +345,6 @@ public ContainerManager run() {
return proxy;
}
private static class CommandTimerTask extends TimerTask {
private final Thread commandThread;
protected final String message;
private boolean cancelled = false;
public CommandTimerTask(Thread thread, ContainerLauncherEvent event) {
super();
this.commandThread = thread;
this.message = "Couldn't complete " + event.getType() + " on "
+ event.getContainerID() + "/" + event.getTaskAttemptID()
+ ". Interrupting and returning";
}
@Override
public void run() {
synchronized (this) {
if (this.cancelled) {
return;
}
LOG.warn(this.message);
StackTraceElement[] trace = this.commandThread.getStackTrace();
StringBuilder logMsg = new StringBuilder();
for (int i = 0; i < trace.length; i++) {
logMsg.append("\n\tat " + trace[i]);
}
LOG.info("Stack trace of the command-thread: \n" + logMsg.toString());
this.commandThread.interrupt();
}
}
@Override
public boolean cancel() {
synchronized (this) {
this.cancelled = true;
return super.cancel();
}
}
}
/**
* Setup and start the container on remote nodemanager.

View File

@ -21,6 +21,8 @@
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@ -30,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -44,18 +47,39 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestContainerLauncher {
static final Log LOG = LogFactory
.getLog(TestContainerLauncher.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
Configuration conf;
Server server;
static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
@Test
public void testPoolSize() throws InterruptedException {
@ -104,10 +128,10 @@ public void testPoolSize() throws InterruptedException {
Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) {
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, i + 10);
TaskAttemptId taskAttemptId =
MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId,
i + 10);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId,
i + 10);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@ -119,8 +143,7 @@ public void testPoolSize() throws InterruptedException {
// Different hosts, there should be an increase in core-thread-pool size to
// 21(11hosts+10buffer)
// Core pool size should be 21 but the live pool size should be only 11.
containerLauncher.expectedCorePoolSize =
11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = false;
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
@ -200,26 +223,28 @@ private void waitForEvents(CustomContainerLauncher containerLauncher,
@Test
public void testSlowNM() throws Exception {
test(false);
test();
}
@Test
public void testSlowNMWithInterruptsSwallowed() throws Exception {
test(true);
}
private void test() throws Exception {
private void test(boolean swallowInterrupts) throws Exception {
MRApp app = new MRAppWithSlowNM(swallowInterrupts);
Configuration conf = new Configuration();
conf = new Configuration();
int maxAttempts = 1;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// set timeout low for the test
conf.setInt("yarn.rpc.nm-command-timeout", 3000);
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
server = rpc.getServer(ContainerManager.class, new DummyContainerManager(),
addr, conf, null, 1);
server.start();
// Set low timeout for NM commands
conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000);
MRApp app = new MRAppWithSlowNM();
try {
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
@ -231,8 +256,8 @@ private void test(boolean swallowInterrupts) throws Exception {
Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
.next().getAttempts();
Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
.size());
Assert.assertEquals("Num attempts is not correct", maxAttempts,
attempts.size());
TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.ASSIGNED);
@ -241,20 +266,18 @@ private void test(boolean swallowInterrupts) throws Exception {
String diagnostics = attempt.getDiagnostics().toString();
LOG.info("attempt.getDiagnostics: " + diagnostics);
if (swallowInterrupts) {
Assert.assertEquals("[Container launch failed for "
+ "container_0_0000_01_000000 : Start-container for "
+ "container_0_0000_01_000000 got interrupted. Returning.]",
diagnostics);
} else {
Assert.assertTrue(diagnostics.contains("Container launch failed for "
+ "container_0_0000_01_000000 : "));
Assert.assertTrue(diagnostics
.contains(": java.lang.InterruptedException"));
}
Assert
.assertTrue(diagnostics
.contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel"));
} finally {
server.stop();
app.stop();
}
}
private final class CustomContainerLauncher extends ContainerLauncherImpl {
@ -317,13 +340,10 @@ protected ContainerLauncherImpl.EventProcessor createEventProcessor(
}
}
private static class MRAppWithSlowNM extends MRApp {
private class MRAppWithSlowNM extends MRApp {
final boolean swallowInterrupts;
public MRAppWithSlowNM(boolean swallowInterrupts) {
public MRAppWithSlowNM() {
super(1, 0, false, "TestContainerLauncher", true);
this.swallowInterrupts = swallowInterrupts;
}
@Override
@ -333,20 +353,57 @@ protected ContainerLauncher createContainerLauncher(AppContext context) {
protected ContainerManager getCMProxy(ContainerId containerID,
String containerManagerBindAddr, ContainerToken containerToken)
throws IOException {
try {
synchronized (this) {
wait(); // Just hang the thread simulating a very slow NM.
}
} catch (InterruptedException e) {
LOG.info(e);
if (!MRAppWithSlowNM.this.swallowInterrupts) {
throw new IOException(e);
}
Thread.currentThread().interrupt();
}
return null;
// make proxy connect to our local containerManager server
ContainerManager proxy = (ContainerManager) rpc.getProxy(
ContainerManager.class,
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
return proxy;
}
};
};
}
}
public class DummyContainerManager implements ContainerManager {
private ContainerStatus status = null;
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
GetContainerStatusResponse response = recordFactory
.newRecordInstance(GetContainerStatusResponse.class);
response.setStatus(status);
return response;
}
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
ContainerLaunchContext container = request.getContainerLaunchContext();
StartContainerResponse response = recordFactory
.newRecordInstance(StartContainerResponse.class);
status = recordFactory.newRecordInstance(ContainerStatus.class);
try {
// make the thread sleep to look like its not going to respond
Thread.sleep(15000);
} catch (Exception e) {
LOG.error(e);
throw new UndeclaredThrowableException(e);
}
status.setState(ContainerState.RUNNING);
status.setContainerId(container.getContainerId());
status.setExitStatus(0);
return response;
}
@Override
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
null).createYarnRemoteException(e);
}
}
}

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@ -38,6 +40,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto;
@ -48,12 +51,25 @@
public class ContainerManagerPBClientImpl implements ContainerManager {
// Not a documented config. Only used for tests
static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+ "rpc.nm-command-timeout";
/**
* Maximum of 1 minute timeout for a Node to react to the command
*/
static final int DEFAULT_COMMAND_TIMEOUT = 60000;
private ContainerManagerPB proxy;
public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
proxy = (ContainerManagerPB)RPC.getProxy(
ContainerManagerPB.class, clientVersion, addr, conf);
ContainerManagerPB.class, clientVersion, addr, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), expireIntvl);
}
public void close() {