From 6957745c2c73cae038ac7960115ffc32de05b953 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Wed, 23 Apr 2014 21:53:06 +0000 Subject: [PATCH] MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill. Contributed by Sangjin Lee git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589524 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/LocalContainerLauncher.java | 199 +++++++++++------- .../mapred/TestLocalContainerLauncher.java | 144 +++++++++++++ 3 files changed, 270 insertions(+), 76 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1a3b437c5a..12f5618e00 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -221,6 +221,9 @@ Release 2.4.1 - UNRELEASED MAPREDUCE-5832. Fixed TestJobClient to not fail on JDK7 or on Windows. (Jian He and Vinod Kumar Vavilapalli via vinodkv) + MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill + (Sangjin Lee via jlowe) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 578cdcdb91..a21b3d5b28 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -24,6 +24,10 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -51,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Runs the container task locally in a thread. * Since all (sub)tasks share the same local directory, they must be executed @@ -71,7 +77,8 @@ public class LocalContainerLauncher extends AbstractService implements private final HashSet localizedFiles; private final AppContext context; private final TaskUmbilicalProtocol umbilical; - private Thread eventHandlingThread; + private ExecutorService taskRunner; + private Thread eventHandler; private BlockingQueue eventQueue = new LinkedBlockingQueue(); @@ -115,14 +122,24 @@ public LocalContainerLauncher(AppContext context, } public void serviceStart() throws Exception { - eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner"); - eventHandlingThread.start(); + // create a single thread for serial execution of tasks + // make it a daemon thread so that the process can exit even if the task is + // not interruptible + taskRunner = + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("uber-SubtaskRunner").build()); + // create and start an event handling thread + eventHandler = new Thread(new EventHandler(), "uber-EventHandler"); + eventHandler.start(); super.serviceStart(); } public void serviceStop() throws Exception { - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); + if (eventHandler != null) { + eventHandler.interrupt(); + } + if (taskRunner != null) { + taskRunner.shutdownNow(); } super.serviceStop(); } @@ -158,12 +175,15 @@ public void handle(ContainerLauncherEvent event) { * - runs Task (runSubMap() or runSubReduce()) * - TA can safely send TA_UPDATE since in RUNNING state */ - private class SubtaskRunner implements Runnable { + private class EventHandler implements Runnable { - private boolean doneWithMaps = false; - private int finishedSubMaps = 0; + private volatile boolean doneWithMaps = false; + private volatile int finishedSubMaps = 0; - SubtaskRunner() { + private final Map> futures = + new ConcurrentHashMap>(); + + EventHandler() { } @SuppressWarnings("unchecked") @@ -172,7 +192,7 @@ public void run() { ContainerLauncherEvent event = null; // Collect locations of map outputs to give to reduces - Map localMapFiles = + final Map localMapFiles = new HashMap(); // _must_ either run subtasks sequentially or accept expense of new JVMs @@ -183,81 +203,41 @@ public void run() { event = eventQueue.take(); } catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL? LOG.error("Returning, interrupted : " + e); - return; + break; } LOG.info("Processing the event " + event.toString()); if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { - ContainerRemoteLaunchEvent launchEv = + final ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event; - TaskAttemptId attemptID = launchEv.getTaskAttemptID(); - - Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); - int numMapTasks = job.getTotalMaps(); - int numReduceTasks = job.getTotalReduces(); - - // YARN (tracking) Task: - org.apache.hadoop.mapreduce.v2.app.job.Task ytask = - job.getTask(attemptID.getTaskId()); - // classic mapred Task: - org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask(); - - // after "launching," send launched event to task attempt to move - // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must - // do getRemoteTask() call first) - //There is no port number because we are not really talking to a task - // tracker. The shuffle is just done through local files. So the - // port number is set to -1 in this case. - context.getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(attemptID, -1)); - - if (numMapTasks == 0) { - doneWithMaps = true; - } - - try { - if (remoteTask.isMapOrReduce()) { - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1); - if (remoteTask.isMapTask()) { - jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1); - } else { - jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1); - } - context.getEventHandler().handle(jce); + // execute the task on a separate thread + Future future = taskRunner.submit(new Runnable() { + public void run() { + runTask(launchEv, localMapFiles); } - runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, - (numReduceTasks > 0), localMapFiles); - - } catch (RuntimeException re) { - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); - context.getEventHandler().handle(jce); - // this is our signal that the subtask failed in some way, so - // simulate a failed JVM/container and send a container-completed - // event to task attempt (i.e., move state machine from RUNNING - // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED]) - context.getEventHandler().handle(new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); - } catch (IOException ioe) { - // if umbilical itself barfs (in error-handler of runSubMap()), - // we're pretty much hosed, so do what YarnChild main() does - // (i.e., exit clumsily--but can never happen, so no worries!) - LOG.fatal("oopsie... this can never happen: " - + StringUtils.stringifyException(ioe)); - System.exit(-1); - } + }); + // remember the current attempt + futures.put(event.getTaskAttemptID(), future); } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) { - // no container to kill, so just send "cleaned" event to task attempt - // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state - // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP) + // cancel (and interrupt) the current running task associated with the + // event + TaskAttemptId taId = event.getTaskAttemptID(); + Future future = futures.remove(taId); + if (future != null) { + LOG.info("canceling the task attempt " + taId); + future.cancel(true); + } + + // send "cleaned" event to task attempt to move us from + // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or + // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP) context.getEventHandler().handle( - new TaskAttemptEvent(event.getTaskAttemptID(), + new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); } else { @@ -267,7 +247,75 @@ public void run() { } } - @SuppressWarnings("deprecation") + @SuppressWarnings("unchecked") + private void runTask(ContainerRemoteLaunchEvent launchEv, + Map localMapFiles) { + TaskAttemptId attemptID = launchEv.getTaskAttemptID(); + + Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); + int numMapTasks = job.getTotalMaps(); + int numReduceTasks = job.getTotalReduces(); + + // YARN (tracking) Task: + org.apache.hadoop.mapreduce.v2.app.job.Task ytask = + job.getTask(attemptID.getTaskId()); + // classic mapred Task: + org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask(); + + // after "launching," send launched event to task attempt to move + // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must + // do getRemoteTask() call first) + + //There is no port number because we are not really talking to a task + // tracker. The shuffle is just done through local files. So the + // port number is set to -1 in this case. + context.getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(attemptID, -1)); + + if (numMapTasks == 0) { + doneWithMaps = true; + } + + try { + if (remoteTask.isMapOrReduce()) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1); + if (remoteTask.isMapTask()) { + jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1); + } else { + jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1); + } + context.getEventHandler().handle(jce); + } + runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, + (numReduceTasks > 0), localMapFiles); + + } catch (RuntimeException re) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); + context.getEventHandler().handle(jce); + // this is our signal that the subtask failed in some way, so + // simulate a failed JVM/container and send a container-completed + // event to task attempt (i.e., move state machine from RUNNING + // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED]) + context.getEventHandler().handle(new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } catch (IOException ioe) { + // if umbilical itself barfs (in error-handler of runSubMap()), + // we're pretty much hosed, so do what YarnChild main() does + // (i.e., exit clumsily--but can never happen, so no worries!) + LOG.fatal("oopsie... this can never happen: " + + StringUtils.stringifyException(ioe)); + ExitUtil.terminate(-1); + } finally { + // remove my future + if (futures.remove(attemptID) != null) { + LOG.info("removed attempt " + attemptID + + " from the futures to keep track of"); + } + } + } + private void runSubtask(org.apache.hadoop.mapred.Task task, final TaskType taskType, TaskAttemptId attemptID, @@ -397,7 +445,6 @@ private void runSubtask(org.apache.hadoop.mapred.Task task, * filenames instead of "file.out". (All of this is entirely internal, * so there are no particular compatibility issues.) */ - @SuppressWarnings("deprecation") private MapOutputFile renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { FileSystem localFs = FileSystem.getLocal(conf); @@ -456,7 +503,7 @@ private void relocalize() { } } - } // end SubtaskRunner + } // end EventHandler private static class RenamedMapOutputFile extends MapOutputFile { private Path path; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java new file mode 100644 index 0000000000..9a0662ee2c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java @@ -0,0 +1,144 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestLocalContainerLauncher { + private static final Log LOG = + LogFactory.getLog(TestLocalContainerLauncher.class); + + @SuppressWarnings("rawtypes") + @Test(timeout=10000) + public void testKillJob() throws Exception { + JobConf conf = new JobConf(); + AppContext context = mock(AppContext.class); + // a simple event handler solely to detect the container cleaned event + final CountDownLatch isDone = new CountDownLatch(1); + EventHandler handler = new EventHandler() { + @Override + public void handle(Event event) { + LOG.info("handling event " + event.getClass() + + " with type " + event.getType()); + if (event instanceof TaskAttemptEvent) { + if (event.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) { + isDone.countDown(); + } + } + } + }; + when(context.getEventHandler()).thenReturn(handler); + + // create and start the launcher + LocalContainerLauncher launcher = + new LocalContainerLauncher(context, mock(TaskUmbilicalProtocol.class)); + launcher.init(conf); + launcher.start(); + + // create mocked job, task, and task attempt + // a single-mapper job + JobId jobId = MRBuilderUtils.newJobId(System.currentTimeMillis(), 1, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId taId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + + Job job = mock(Job.class); + when(job.getTotalMaps()).thenReturn(1); + when(job.getTotalReduces()).thenReturn(0); + Map jobs = new HashMap(); + jobs.put(jobId, job); + // app context returns the one and only job + when(context.getAllJobs()).thenReturn(jobs); + + org.apache.hadoop.mapreduce.v2.app.job.Task ytask = + mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); + when(ytask.getType()).thenReturn(TaskType.MAP); + when(job.getTask(taskId)).thenReturn(ytask); + + // create a sleeping mapper that runs beyond the test timeout + MapTask mapTask = mock(MapTask.class); + when(mapTask.isMapOrReduce()).thenReturn(true); + when(mapTask.isMapTask()).thenReturn(true); + TaskAttemptID taskID = TypeConverter.fromYarn(taId); + when(mapTask.getTaskID()).thenReturn(taskID); + when(mapTask.getJobID()).thenReturn(taskID.getJobID()); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + // sleep for a long time + LOG.info("sleeping for 5 minutes..."); + Thread.sleep(5*60*1000); + return null; + } + }).when(mapTask).run(isA(JobConf.class), isA(TaskUmbilicalProtocol.class)); + + // pump in a task attempt launch event + ContainerLauncherEvent launchEvent = + new ContainerRemoteLaunchEvent(taId, null, createMockContainer(), mapTask); + launcher.handle(launchEvent); + + Thread.sleep(200); + // now pump in a container clean-up event + ContainerLauncherEvent cleanupEvent = + new ContainerLauncherEvent(taId, null, null, null, + ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP); + launcher.handle(cleanupEvent); + + // wait for the event to fire: this should be received promptly + isDone.await(); + + launcher.close(); + } + + private static Container createMockContainer() { + Container container = mock(Container.class); + NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234); + when(container.getNodeId()).thenReturn(nodeId); + return container; + } +}