From b980f2aa4e0191811e963ae4118c53af9cb4f1b8 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Sat, 13 Jul 2013 23:30:20 +0000 Subject: [PATCH] YARN-763. AMRMClientAsync should stop heartbeating after receiving shutdown from RM (Xuan Gong via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1502914 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/async/impl/AMRMClientAsyncImpl.java | 28 ++---- .../api/async/impl/TestAMRMClientAsync.java | 99 +++++++++++++++++++ 3 files changed, 112 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index afc545a3ed..410e4eed1e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -686,6 +686,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-541. getAllocatedContainers() is not returning all the allocated containers (bikas) + YARN-763. AMRMClientAsync should stop heartbeating after receiving + shutdown from RM (Xuan Gong via bikas) + BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index cc3969dbe3..f7002868bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -92,6 +93,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { + handlerThread.setDaemon(true); handlerThread.start(); client.start(); super.serviceStart(); @@ -99,27 +101,19 @@ protected void serviceStart() throws Exception { /** * Tells the heartbeat and handler threads to stop and waits for them to - * terminate. Calling this method from the callback handler thread would cause - * deadlock, and thus should be avoided. + * terminate. */ @Override protected void serviceStop() throws Exception { - if (Thread.currentThread() == handlerThread) { - throw new YarnRuntimeException("Cannot call stop from callback handler thread!"); - } keepRunning = false; + heartbeatThread.interrupt(); try { heartbeatThread.join(); } catch (InterruptedException ex) { LOG.error("Error joining with heartbeat thread", ex); } client.stop(); - try { - handlerThread.interrupt(); - handlerThread.join(); - } catch (InterruptedException ex) { - LOG.error("Error joining with hander thread", ex); - } + handlerThread.interrupt(); super.serviceStop(); } @@ -248,6 +242,10 @@ public void run() { while (true) { try { responseQueue.put(response); + if (response.getAMCommand() == AMCommand.AM_RESYNC + || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + return; + } break; } catch (InterruptedException ex) { LOG.info("Interrupted while waiting to put on response queue", ex); @@ -285,24 +283,18 @@ public void run() { } if (response.getAMCommand() != null) { - boolean stop = false; switch(response.getAMCommand()) { case AM_RESYNC: case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); - stop = true; - break; + return; default: String msg = "Unhandled value of AMCommand: " + response.getAMCommand(); LOG.error(msg); throw new YarnRuntimeException(msg); } - if(stop) { - // should probably stop heartbeating also YARN-763 - break; - } } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index b50742df68..8dd4ac6e66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -23,7 +23,10 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -218,6 +221,65 @@ public void testAMRMClientAsyncReboot() throws Exception { Assert.assertTrue(callbackHandler.callbackCount == 0); } + @Test (timeout = 10000) + public void testAMRMClientAsyncShutDown() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + final AllocateResponse shutDownResponse = createAllocateResponse( + new ArrayList(), new ArrayList(), null); + shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); + when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + asyncClient.registerApplicationMaster("localhost", 1234, null); + + Thread.sleep(50); + + verify(client, times(1)).allocate(anyFloat()); + asyncClient.stop(); + } + + @Test (timeout = 5000) + public void testCallAMRMClientAsyncStopFromCallbackHandler() + throws YarnException, IOException, InterruptedException { + Configuration conf = new Configuration(); + TestCallbackHandler2 callbackHandler = new TestCallbackHandler2(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + List completed = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + final AllocateResponse response = createAllocateResponse(completed, + new ArrayList(), null); + + when(client.allocate(anyFloat())).thenReturn(response); + + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + callbackHandler.registerAsyncClient(asyncClient); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.stop == false) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + private AllocateResponse createAllocateResponse( List completed, List allocated, List nmTokens) { @@ -323,4 +385,41 @@ public void onError(Exception e) { } } } + + private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler { + Object notifier = new Object(); + @SuppressWarnings("rawtypes") + AMRMClientAsync asynClient; + boolean stop = false; + + @Override + public void onContainersCompleted(List statuses) {} + + @Override + public void onContainersAllocated(List containers) {} + + @Override + public void onShutdownRequest() {} + + @Override + public void onNodesUpdated(List updatedNodes) {} + + @Override + public float getProgress() { + asynClient.stop(); + stop = true; + synchronized (notifier) { + notifier.notifyAll(); + } + return 0; + } + + @Override + public void onError(Exception e) {} + + public void registerAsyncClient( + AMRMClientAsync asyncClient) { + this.asynClient = asyncClient; + } + } }