From 5ee495e6f34faff231ad87ec890188eb63617393 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 21 Feb 2012 05:08:25 +0000 Subject: [PATCH 1/3] MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when their EventHandlers get exceptions. (vinodkv) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1291598 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 2 + .../app/launcher/ContainerLauncherEvent.java | 3 +- .../app/launcher/ContainerLauncherImpl.java | 1 - .../v2/app/recover/RecoveryService.java | 12 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 1 + .../hadoop/mapreduce/v2/app/TestFail.java | 12 +- .../hadoop/mapreduce/v2/app/TestRecovery.java | 27 ----- .../app/launcher/TestContainerLauncher.java | 103 ++++++++++++------ .../mapreduce/v2/hs/JobHistoryServer.java | 4 + .../mapreduce/v2/hs/TestJobHistoryEvents.java | 2 - .../dev-support/findbugs-exclude.xml | 6 + .../hadoop/yarn/event/AsyncDispatcher.java | 30 ++--- .../apache/hadoop/yarn/event/Dispatcher.java | 10 ++ .../hadoop/yarn/event/DrainDispatcher.java | 4 +- .../yarn/server/nodemanager/NodeManager.java | 2 + .../ContainerManagerImpl.java | 1 + .../application/TestApplication.java | 2 +- .../container/TestContainer.java | 2 +- .../localizer/TestLocalizedResource.java | 18 +-- .../TestResourceLocalizationService.java | 30 +++-- .../resourcemanager/ResourceManager.java | 14 ++- 22 files changed, 173 insertions(+), 117 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c28c138924..602d71b262 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -108,6 +108,7 @@ Release 0.23.2 - UNRELEASED OPTIMIZATIONS BUG FIXES + MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering DeletionService threads (Jason Lowe via bobby) @@ -125,6 +126,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to avoid NumberFormatException caused by overflow. (Zhihong Yu via szetszwo) + + MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when + their EventHandlers get exceptions. (vinodkv) Release 0.23.1 - 2012-02-17 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6c45574b7d..2e17869f79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -188,6 +188,8 @@ public MRAppMaster(ApplicationAttemptId applicationAttemptId, @Override public void init(final Configuration conf) { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + downloadTokensAndSetupUGI(conf); context = new RunningAppContext(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java index 00bfc391ae..ce720e1dea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java @@ -61,7 +61,8 @@ public ContainerToken getContainerToken() { @Override public String toString() { - return super.toString() + " for taskAttempt " + taskAttemptID; + return super.toString() + " for container " + containerID + " taskAttempt " + + taskAttemptID; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 1443eed608..0befad8642 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -334,7 +334,6 @@ public void run() { LOG.error("Returning, interrupted : " + e); return; } - int poolSize = launcherPool.getCorePoolSize(); // See if we need up the pool size only if haven't reached the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 3bf6e07584..8e041e890c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -216,26 +216,18 @@ private void parse() throws IOException { protected Dispatcher createRecoveryDispatcher() { return new RecoveryDispatcher(); } - - protected Dispatcher createRecoveryDispatcher(boolean exitOnException) { - return new RecoveryDispatcher(exitOnException); - } @SuppressWarnings("rawtypes") class RecoveryDispatcher extends AsyncDispatcher { private final EventHandler actualHandler; private final EventHandler handler; - RecoveryDispatcher(boolean exitOnException) { - super(exitOnException); + RecoveryDispatcher() { + super(); actualHandler = super.getEventHandler(); handler = new InterceptingEventHandler(actualHandler); } - RecoveryDispatcher() { - this(false); - } - @Override @SuppressWarnings("unchecked") public void dispatch(Event event) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 00bdaebfe8..325adacb30 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 1b35b21559..60ec171c5f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Map; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerToken; @@ -49,6 +51,7 @@ * Tests the state machine with respect to Job/Task/TaskAttempt failure * scenarios. */ +@SuppressWarnings("unchecked") public class TestFail { @Test @@ -247,10 +250,17 @@ protected TaskAttemptListener createTaskAttemptListener(AppContext context) { //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure return new TaskAttemptListenerImpl(getContext(), null) { + @Override public void startRpcServer(){}; + @Override public void stopRpcServer(){}; + @Override + public InetSocketAddress getAddress() { + return NetUtils.createSocketAddr("localhost", 1234); + } public void init(Configuration conf) { - conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout + conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000); super.init(conf); } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 509c49e9cc..87fce7ece6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -54,12 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; -import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; -import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -724,13 +719,6 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete, super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); } - @Override - protected Recovery createRecoveryService(AppContext appContext) { - return new RecoveryServiceWithCustomDispatcher( - appContext.getApplicationAttemptId(), appContext.getClock(), - getCommitter()); - } - @Override protected ContainerLauncher createContainerLauncher(AppContext context) { MockContainerLauncher launcher = new MockContainerLauncher() { @@ -757,21 +745,6 @@ protected EventHandler createJobHistoryHandler( } } - static class RecoveryServiceWithCustomDispatcher extends RecoveryService { - - public RecoveryServiceWithCustomDispatcher( - ApplicationAttemptId applicationAttemptId, Clock clock, - OutputCommitter committer) { - super(applicationAttemptId, clock, committer); - } - - @Override - public Dispatcher createRecoveryDispatcher() { - return super.createRecoveryDispatcher(false); - } - - } - public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index f25de5c8cc..b0ecb5c9bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -64,8 +65,6 @@ public void testPoolSize() throws InterruptedException { appId, 3); JobId jobId = MRBuilderUtils.newJobId(appId, 8); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); - TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); - ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10); AppContext context = mock(AppContext.class); CustomContainerLauncher containerLauncher = new CustomContainerLauncher( @@ -83,6 +82,8 @@ public void testPoolSize() throws InterruptedException { containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; for (int i = 0; i < 10; i++) { + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i); containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); @@ -92,9 +93,21 @@ public void testPoolSize() throws InterruptedException { Assert.assertNull(containerLauncher.foundErrors); // Same set of hosts, so no change - containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.finishEventHandling = true; + int timeOut = 0; + while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) { + LOG.info("Waiting for number of events processed to become " + 10 + + ". It is now " + containerLauncher.numEventsProcessed.get() + + ". Timeout is " + timeOut); + Thread.sleep(1000); + } + Assert.assertEquals(10, containerLauncher.numEventsProcessed.get()); + containerLauncher.finishEventHandling = false; for (int i = 0; i < 10; i++) { + ContainerId containerId = + BuilderUtils.newContainerId(appAttemptId, i + 10); + TaskAttemptId taskAttemptId = + MRBuilderUtils.newTaskAttemptId(taskId, i + 10); containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); @@ -106,14 +119,16 @@ public void testPoolSize() throws InterruptedException { // Different hosts, there should be an increase in core-thread-pool size to // 21(11hosts+10buffer) // Core pool size should be 21 but the live pool size should be only 11. - containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE; - for (int i = 1; i <= 2; i++) { - containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, - containerId, "host1" + i + ":1234", null, - ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); - } - waitForEvents(containerLauncher, 22); - Assert.assertEquals(12, threadPool.getPoolSize()); + containerLauncher.expectedCorePoolSize = + 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.finishEventHandling = false; + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host11:1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + waitForEvents(containerLauncher, 21); + Assert.assertEquals(11, threadPool.getPoolSize()); Assert.assertNull(containerLauncher.foundErrors); containerLauncher.stop(); @@ -172,15 +187,15 @@ public void testPoolLimits() throws InterruptedException { private void waitForEvents(CustomContainerLauncher containerLauncher, int expectedNumEvents) throws InterruptedException { - int timeOut = 20; - while (expectedNumEvents != containerLauncher.numEventsProcessed - || timeOut++ < 20) { + int timeOut = 0; + while (containerLauncher.numEventsProcessing.get() < expectedNumEvents + && timeOut++ < 20) { LOG.info("Waiting for number of events to become " + expectedNumEvents - + ". It is now " + containerLauncher.numEventsProcessed); + + ". It is now " + containerLauncher.numEventsProcessing.get()); Thread.sleep(1000); } - Assert - .assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed); + Assert.assertEquals(expectedNumEvents, + containerLauncher.numEventsProcessing.get()); } @Test @@ -244,9 +259,11 @@ private void test(boolean swallowInterrupts) throws Exception { private final class CustomContainerLauncher extends ContainerLauncherImpl { private volatile int expectedCorePoolSize = 0; - private volatile int numEventsProcessed = 0; + private AtomicInteger numEventsProcessing = new AtomicInteger(0); + private AtomicInteger numEventsProcessed = new AtomicInteger(0); private volatile String foundErrors = null; private volatile boolean finishEventHandling; + private CustomContainerLauncher(AppContext context) { super(context); } @@ -255,8 +272,38 @@ public ThreadPoolExecutor getThreadPool() { return super.launcherPool; } + private final class CustomEventProcessor extends + ContainerLauncherImpl.EventProcessor { + private final ContainerLauncherEvent event; + + private CustomEventProcessor(ContainerLauncherEvent event) { + super(event); + this.event = event; + } + + @Override + public void run() { + // do nothing substantial + + LOG.info("Processing the event " + event.toString()); + + numEventsProcessing.incrementAndGet(); + // Stall + while (!finishEventHandling) { + synchronized (this) { + try { + wait(1000); + } catch (InterruptedException e) { + ; + } + } + } + numEventsProcessed.incrementAndGet(); + } + } + protected ContainerLauncherImpl.EventProcessor createEventProcessor( - ContainerLauncherEvent event) { + final ContainerLauncherEvent event) { // At this point of time, the EventProcessor is being created and so no // additional threads would have been created. @@ -266,23 +313,7 @@ protected ContainerLauncherImpl.EventProcessor createEventProcessor( + launcherPool.getCorePoolSize(); } - return new ContainerLauncherImpl.EventProcessor(event) { - @Override - public void run() { - // do nothing substantial - numEventsProcessed++; - // Stall - synchronized(this) { - try { - while(!finishEventHandling) { - wait(1000); - } - } catch (InterruptedException e) { - ; - } - } - } - }; + return new CustomEventProcessor(event); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index c1626f9531..b420e3dfbe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.service.CompositeService; /****************************************************************** @@ -51,6 +52,9 @@ public JobHistoryServer() { @Override public synchronized void init(Configuration conf) { Configuration config = new YarnConfiguration(conf); + + config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + try { doSecureLogin(conf); } catch(IOException ie) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java index ec2fe5ea3a..58ab9c8117 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java @@ -53,7 +53,6 @@ public class TestJobHistoryEvents { @Test public void testHistoryEvents() throws Exception { Configuration conf = new Configuration(); - conf.set(MRJobConfig.USER_NAME, "test"); MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); @@ -102,7 +101,6 @@ public void testHistoryEvents() throws Exception { public void testEventsFlushOnStop() throws Exception { Configuration conf = new Configuration(); - conf.set(MRJobConfig.USER_NAME, "test"); MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this .getClass().getName(), true); app.submit(conf); diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 4b17c1e943..799e20092d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -193,6 +193,12 @@ + + + + + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index ffa2e9cfc6..1b3a76a477 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.service.AbstractService; @@ -48,22 +49,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private boolean exitOnDispatchException; public AsyncDispatcher() { - this(new HashMap, EventHandler>(), - new LinkedBlockingQueue(), true); - } - - public AsyncDispatcher(boolean exitOnException) { - this(new HashMap, EventHandler>(), - new LinkedBlockingQueue(), exitOnException); + this(new LinkedBlockingQueue()); } - AsyncDispatcher( - Map, EventHandler> eventDispatchers, - BlockingQueue eventQueue, boolean exitOnException) { + public AsyncDispatcher(BlockingQueue eventQueue) { super("Dispatcher"); this.eventQueue = eventQueue; - this.eventDispatchers = eventDispatchers; - this.exitOnDispatchException = exitOnException; + this.eventDispatchers = new HashMap, EventHandler>(); } Runnable createThread() { @@ -86,6 +78,14 @@ public void run() { }; } + @Override + public synchronized void init(Configuration conf) { + this.exitOnDispatchException = + conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); + super.init(conf); + } + @Override public void start() { //start all the components @@ -103,7 +103,7 @@ public void stop() { try { eventHandlingThread.join(); } catch (InterruptedException ie) { - LOG.debug("Interrupted Exception while stopping", ie); + LOG.warn("Interrupted Exception while stopping", ie); } } @@ -126,8 +126,9 @@ protected void dispatch(Event event) { } catch (Throwable t) { //TODO Maybe log the state of the queue - LOG.fatal("Error in dispatcher thread. Exiting..", t); + LOG.fatal("Error in dispatcher thread", t); if (exitOnDispatchException) { + LOG.info("Exiting, bbye.."); System.exit(-1); } } @@ -177,6 +178,7 @@ public void handle(Event event) { try { eventQueue.put(event); } catch (InterruptedException e) { + LOG.warn("AsyncDispatcher thread interrupted", e); throw new YarnException(e); } }; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java index 40c07755a9..f87f1b26c7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java @@ -23,8 +23,18 @@ * event handlers based on event types. * */ +@SuppressWarnings("rawtypes") public interface Dispatcher { + // Configuration to make sure dispatcher crashes but doesn't do system-exit in + // case of errors. By default, it should be false, so that tests are not + // affected. For all daemons it should be explicitly set to true so that + // daemons can crash instead of hanging around. + public static final String DISPATCHER_EXIT_ON_ERROR_KEY = + "yarn.dispatcher.exit-on-error"; + + public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false; + EventHandler getEventHandler(); void register(Class eventType, EventHandler handler); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 20d7dfca94..e79e7b360e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.yarn.event; -import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { @@ -36,7 +34,7 @@ public DrainDispatcher() { } private DrainDispatcher(BlockingQueue eventQueue) { - super(new HashMap, EventHandler>(), eventQueue, true); + super(eventQueue); this.queue = eventQueue; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 439b5e37a5..8f875e117a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -99,6 +99,8 @@ protected void doSecureLogin() throws IOException { @Override public void init(Configuration conf) { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + Context context = new NMContext(); // Create the secretManager if need be. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3169f2f1b8..1eb2283793 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -137,6 +137,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.context = context; this.dirsHandler = dirsHandler; + // ContainerManager level dispatcher. dispatcher = new AsyncDispatcher(); this.deletionService = deletionContext; this.metrics = metrics; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 236489b7f4..4b3736024b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -376,7 +376,7 @@ private class WrappedApplication { WrappedApplication(int id, long timestamp, String user, int numContainers) { dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); localizerBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index e4b7aa47a7..bc6ec196e1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -517,7 +517,7 @@ private class WrappedContainer { WrappedContainer(int appId, long timestamp, int id, String user, boolean withLocalRes, boolean withServiceData) { dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); localizerBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index a1c6bb8479..07d8df1db6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,19 +38,14 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; - import org.junit.Test; -import static org.junit.Assert.*; - import org.mockito.ArgumentMatcher; -import static org.mockito.Mockito.*; public class TestLocalizedResource { @@ -62,7 +66,7 @@ static ContainerId getMockContainer(int id) { @SuppressWarnings("unchecked") // mocked generic public void testNotification() throws Exception { DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); try { dispatcher.start(); EventHandler containerBus = mock(EventHandler.class); @@ -175,7 +179,7 @@ public void testNotification() throws Exception { @Test public void testDirectLocalization() throws Exception { DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); try { dispatcher.start(); LocalResource apiRsrc = createMockResource(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 9886d37c73..84cd7f22b2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -18,8 +18,23 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import java.net.InetSocketAddress; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyShort; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,7 +48,6 @@ import junit.framework.Assert; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FSDataOutputStream; @@ -42,6 +56,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -60,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; -import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; @@ -81,13 +95,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; - import org.junit.Test; -import static org.junit.Assert.*; - import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import static org.mockito.Mockito.*; public class TestResourceLocalizationService { @@ -98,11 +108,11 @@ public class TestResourceLocalizationService { public void testLocalizationInit() throws Exception { final Configuration conf = new Configuration(); AsyncDispatcher dispatcher = new AsyncDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); ContainerExecutor exec = mock(ContainerExecutor.class); DeletionService delService = spy(new DeletionService(exec)); - delService.init(null); + delService.init(new Configuration()); delService.start(); AbstractFileSystem spylfs = @@ -371,7 +381,7 @@ public void testLocalizationHeartbeat() throws Exception { DeletionService delServiceReal = new DeletionService(exec); DeletionService delService = spy(delServiceReal); - delService.init(null); + delService.init(new Configuration()); delService.start(); ResourceLocalizationService rawService = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index dc7d29cdcb..bf01cef005 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -131,6 +131,8 @@ public synchronized void init(Configuration conf) { this.conf = conf; + this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + this.rmDispatcher = createDispatcher(); addIfService(this.rmDispatcher); @@ -265,6 +267,7 @@ public static class SchedulerEventDispatcher extends AbstractService private final BlockingQueue eventQueue = new LinkedBlockingQueue(); private final Thread eventProcessor; + private volatile boolean stopped = false; public SchedulerEventDispatcher(ResourceScheduler scheduler) { super(SchedulerEventDispatcher.class.getName()); @@ -285,7 +288,7 @@ public void run() { SchedulerEvent event; - while (!Thread.currentThread().isInterrupted()) { + while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { @@ -296,9 +299,13 @@ public void run() { try { scheduler.handle(event); } catch (Throwable t) { - LOG.error("Error in handling event type " + event.getType() + LOG.fatal("Error in handling event type " + event.getType() + " to the scheduler", t); - return; // TODO: Kill RM. + if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) { + LOG.info("Exiting, bbye.."); + System.exit(-1); + } } } } @@ -306,6 +313,7 @@ public void run() { @Override public synchronized void stop() { + this.stopped = true; this.eventProcessor.interrupt(); try { this.eventProcessor.join(); From ae7e43139dda16c501f6d7606f04d631e0a23c3e Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Tue, 21 Feb 2012 05:19:53 +0000 Subject: [PATCH 2/3] HADOOP-8084. Updates ProtoBufRpc engine to not do an unnecessary copy for RPC request/response. Contributed by Devaraj Das. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1291602 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../org/apache/hadoop/ipc/ProtobufRpcEngine.java | 14 ++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index cbc39e5d74..a4ee873802 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -86,6 +86,9 @@ Trunk (unreleased changes) HADOOP-8070. Add a standalone benchmark for RPC call performance. (todd) + HADOOP-8084. Updates ProtoBufRpc engine to not do an unnecessary copy + for RPC request/response. (ddas) + BUG FIXES HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 9b3862abdb..9a1a1615b0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputOutputStream; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; @@ -45,6 +46,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -268,13 +270,13 @@ public RpcRequestWritable() { @Override public void write(DataOutput out) throws IOException { - out.writeInt(message.toByteArray().length); - out.write(message.toByteArray()); + ((Message)message).writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); } @Override public void readFields(DataInput in) throws IOException { - int length = in.readInt(); + int length = ProtoUtil.readRawVarint32(in); byte[] bytes = new byte[length]; in.readFully(bytes); message = HadoopRpcRequestProto.parseFrom(bytes); @@ -297,13 +299,13 @@ public RpcResponseWritable(HadoopRpcResponseProto message) { @Override public void write(DataOutput out) throws IOException { - out.writeInt(message.toByteArray().length); - out.write(message.toByteArray()); + ((Message)message).writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); } @Override public void readFields(DataInput in) throws IOException { - int length = in.readInt(); + int length = ProtoUtil.readRawVarint32(in); byte[] bytes = new byte[length]; in.readFully(bytes); message = HadoopRpcResponseProto.parseFrom(bytes); From b2172c394e7ef40c4820abe6b830b64c7ceeec4e Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 21 Feb 2012 05:30:23 +0000 Subject: [PATCH 3/3] MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it to the maven build. Contributed by Ravi Prakash. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1291606 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 2 +- .../apache/hadoop/mapred/TestJobCleanup.java | 291 +++++++++--------- 3 files changed, 158 insertions(+), 138 deletions(-) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestJobCleanup.java (55%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 602d71b262..63cd2e37f1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -129,6 +129,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when their EventHandlers get exceptions. (vinodkv) + + MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it + to the maven build. (Ravi Prakash via vinodkv) Release 0.23.1 - 2012-02-17 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 2e17869f79..0dddd66d59 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -381,6 +381,7 @@ public void handle(JobFinishEvent event) { // this is the only job, so shut down the Appmaster // note in a workflow scenario, this may lead to creation of a new // job (FIXME?) + // Send job-end notification if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { try { LOG.info("Job end notification started for jobID : " @@ -407,7 +408,6 @@ public void handle(JobFinishEvent event) { LOG.info("Calling stop for all the services"); stop(); - // Send job-end notification } catch (Throwable t) { LOG.warn("Graceful stop failed ", t); } diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java similarity index 55% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java index 3635892d63..6139fdb05f 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java @@ -22,10 +22,8 @@ import java.io.File; import java.io.IOException; -import junit.extensions.TestSetup; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,64 +31,71 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; /** * A JUnit test to test Map-Reduce job cleanup. */ -public class TestJobCleanup extends TestCase { - private static String TEST_ROOT_DIR = - new File(System.getProperty("test.build.data", "/tmp") + "/" - + "test-job-cleanup").toString(); - private static final String CUSTOM_CLEANUP_FILE_NAME = - "_custom_cleanup"; - private static final String ABORT_KILLED_FILE_NAME = - "_custom_abort_killed"; - private static final String ABORT_FAILED_FILE_NAME = - "_custom_abort_failed"; +@SuppressWarnings("deprecation") +public class TestJobCleanup { + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp") + "/" + "test-job-cleanup").toString(); + private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup"; + private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed"; + private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed"; private static FileSystem fileSys = null; private static MiniMRCluster mr = null; private static Path inDir = null; private static Path emptyInDir = null; private static int outDirs = 0; - - public static Test suite() { - TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) { - protected void setUp() throws Exception { - JobConf conf = new JobConf(); - fileSys = FileSystem.get(conf); - fileSys.delete(new Path(TEST_ROOT_DIR), true); - conf.set("mapred.job.tracker.handler.count", "1"); - conf.set("mapred.job.tracker", "127.0.0.1:0"); - conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); - conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); - mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); - inDir = new Path(TEST_ROOT_DIR, "test-input"); - String input = "The quick brown fox\n" + "has many silly\n" - + "red fox sox\n"; - DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); - file.writeBytes(input); - file.close(); - emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); - fileSys.mkdirs(emptyInDir); - } - - protected void tearDown() throws Exception { - if (fileSys != null) { - fileSys.delete(new Path(TEST_ROOT_DIR), true); - fileSys.close(); - } - if (mr != null) { - mr.shutdown(); - } - } - }; - return setup; + private static Log LOG = LogFactory.getLog(TestJobCleanup.class); + + @BeforeClass + public static void setUp() throws IOException { + JobConf conf = new JobConf(); + fileSys = FileSystem.get(conf); + fileSys.delete(new Path(TEST_ROOT_DIR), true); + conf.set("mapred.job.tracker.handler.count", "1"); + conf.set("mapred.job.tracker", "127.0.0.1:0"); + conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); + conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); + conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR + + "/intermediate"); + conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true"); + + mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); + inDir = new Path(TEST_ROOT_DIR, "test-input"); + String input = "The quick brown fox\n" + "has many silly\n" + + "red fox sox\n"; + DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); + file.writeBytes(input); + file.close(); + emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); + fileSys.mkdirs(emptyInDir); } - - /** - * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)} - * making a _failed/_killed in the output folder + + @AfterClass + public static void tearDown() throws Exception { + if (fileSys != null) { + // fileSys.delete(new Path(TEST_ROOT_DIR), true); + fileSys.close(); + } + if (mr != null) { + mr.shutdown(); + } + } + + /** + * Committer with deprecated + * {@link FileOutputCommitter#cleanupJob(JobContext)} making a _failed/_killed + * in the output folder */ static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter { @Override @@ -101,31 +106,40 @@ public void cleanupJob(JobContext context) throws IOException { FileSystem fs = outputPath.getFileSystem(conf); fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close(); } + + @Override + public void commitJob(JobContext context) throws IOException { + cleanupJob(context); + } + + @Override + public void abortJob(JobContext context, int i) throws IOException { + cleanupJob(context); + } } - - /** + + /** * Committer with abort making a _failed/_killed in the output folder */ static class CommitterWithCustomAbort extends FileOutputCommitter { @Override - public void abortJob(JobContext context, int state) - throws IOException { - JobConf conf = context.getJobConf();; + public void abortJob(JobContext context, int state) throws IOException { + JobConf conf = context.getJobConf(); + ; Path outputPath = FileOutputFormat.getOutputPath(conf); FileSystem fs = outputPath.getFileSystem(conf); - String fileName = (state == JobStatus.FAILED) - ? TestJobCleanup.ABORT_FAILED_FILE_NAME - : TestJobCleanup.ABORT_KILLED_FILE_NAME; + String fileName = (state == JobStatus.FAILED) ? TestJobCleanup.ABORT_FAILED_FILE_NAME + : TestJobCleanup.ABORT_KILLED_FILE_NAME; fs.create(new Path(outputPath, fileName)).close(); } } - + private Path getNewOutputDir() { return new Path(TEST_ROOT_DIR, "output-" + outDirs++); } - - private void configureJob(JobConf jc, String jobName, int maps, int reds, - Path outDir) { + + private void configureJob(JobConf jc, String jobName, int maps, int reds, + Path outDir) { jc.setJobName(jobName); jc.setInputFormat(TextInputFormat.class); jc.setOutputKeyClass(LongWritable.class); @@ -137,36 +151,38 @@ private void configureJob(JobConf jc, String jobName, int maps, int reds, jc.setNumMapTasks(maps); jc.setNumReduceTasks(reds); } - + // run a job with 1 map and let it run to completion - private void testSuccessfulJob(String filename, - Class committer, String[] exclude) - throws IOException { + private void testSuccessfulJob(String filename, + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "job with cleanup()", 1, 0, outDir); jc.setOutputCommitter(committer); - + JobClient jobClient = new JobClient(jc); RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); job.waitForCompletion(); - + + LOG.info("Job finished : " + job.isComplete()); Path testFile = new Path(outDir, filename); - assertTrue("Done file missing for job " + id, fileSys.exists(testFile)); - + assertTrue("Done file \"" + testFile + "\" missing for job " + id, + fileSys.exists(testFile)); + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); - assertFalse("File " + file + " should not be present for successful job " - + id, fileSys.exists(file)); + assertFalse("File " + file + " should not be present for successful job " + + id, fileSys.exists(file)); } } - + // run a job for which all the attempts simply fail. - private void testFailedJob(String fileName, - Class committer, String[] exclude) - throws IOException { + private void testFailedJob(String fileName, + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "fail job with abort()", 1, 0, outDir); @@ -179,128 +195,129 @@ private void testFailedJob(String fileName, RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); job.waitForCompletion(); - + if (fileName != null) { Path testFile = new Path(outDir, fileName); - assertTrue("File " + testFile + " missing for failed job " + id, - fileSys.exists(testFile)); + assertTrue("File " + testFile + " missing for failed job " + id, + fileSys.exists(testFile)); } - + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); assertFalse("File " + file + " should not be present for failed job " - + id, fileSys.exists(file)); + + id, fileSys.exists(file)); } } - + // run a job which gets stuck in mapper and kill it. private void testKilledJob(String fileName, - Class committer, String[] exclude) - throws IOException { + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "kill job with abort()", 1, 0, outDir); // set the job to wait for long jc.setMapperClass(UtilsForTests.KillMapper.class); jc.setOutputCommitter(committer); - + JobClient jobClient = new JobClient(jc); RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); - JobInProgress jip = - mr.getJobTrackerRunner().getJobTracker().getJob(job.getID()); - + + Counters counters = job.getCounters(); + // wait for the map to be launched while (true) { - if (jip.runningMaps() == 1) { + if (counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) == 1) { break; } + LOG.info("Waiting for a map task to be launched"); UtilsForTests.waitFor(100); + counters = job.getCounters(); } - + job.killJob(); // kill the job - + job.waitForCompletion(); // wait for the job to complete - + if (fileName != null) { Path testFile = new Path(outDir, fileName); - assertTrue("File " + testFile + " missing for job " + id, - fileSys.exists(testFile)); + assertTrue("File " + testFile + " missing for job " + id, + fileSys.exists(testFile)); } - + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); assertFalse("File " + file + " should not be present for killed job " - + id, fileSys.exists(file)); + + id, fileSys.exists(file)); } } - + /** * Test default cleanup/abort behavior - * + * * @throws IOException */ + @Test public void testDefaultCleanupAndAbort() throws IOException { // check with a successful job testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, - FileOutputCommitter.class, - new String[] {}); - + FileOutputCommitter.class, new String[] {}); + // check with a failed job - testFailedJob(null, - FileOutputCommitter.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); - + testFailedJob(null, FileOutputCommitter.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME }); + // check default abort job kill - testKilledJob(null, - FileOutputCommitter.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + testKilledJob(null, FileOutputCommitter.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME }); } - + /** * Test if a failed job with custom committer runs the abort code. - * + * * @throws IOException */ + @Test public void testCustomAbort() throws IOException { // check with a successful job - testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, - CommitterWithCustomAbort.class, - new String[] {ABORT_FAILED_FILE_NAME, - ABORT_KILLED_FILE_NAME}); - + testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, + CommitterWithCustomAbort.class, new String[] { ABORT_FAILED_FILE_NAME, + ABORT_KILLED_FILE_NAME }); + // check with a failed job - testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, - ABORT_KILLED_FILE_NAME}); - + testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME, + ABORT_KILLED_FILE_NAME }); + // check with a killed job - testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, - ABORT_FAILED_FILE_NAME}); + testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME, + ABORT_FAILED_FILE_NAME }); } /** * Test if a failed job with custom committer runs the deprecated - * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api + * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api * compatibility testing. */ + @Test public void testCustomCleanup() throws IOException { // check with a successful job - testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {}); - - // check with a failed job - testFailedJob(CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); - - // check with a killed job - testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {}); + + // check with a failed job + testFailedJob(CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + + // check with a killed job + testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); } }