From ee3825e2783149db7a3d648f31cb1a483de64c0f Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Sat, 9 Aug 2014 18:44:51 +0000 Subject: [PATCH] YARN-1954. Added waitFor to AMRMClient(Async). Contributed by Tsuyoshi Ozawa. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617002 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/client/api/AMRMClient.java | 63 +++++++++++++++ .../client/api/async/AMRMClientAsync.java | 64 ++++++++++++++++ .../api/async/impl/TestAMRMClientAsync.java | 76 ++++++++++++++++++- .../yarn/client/api/impl/TestAMRMClient.java | 35 +++++++++ 5 files changed, 237 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a62e05ca75..c47acfde65 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -103,6 +103,8 @@ Release 2.6.0 - UNRELEASED YARN-2026. Fair scheduler: Consider only active queues for computing fairshare. (Ashwin Shankar via kasha) + YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 3daa156896..f41c018cea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @InterfaceAudience.Public @InterfaceStability.Stable public abstract class AMRMClient extends AbstractService { + private static final Log LOG = LogFactory.getLog(AMRMClient.class); /** * Create a new instance of AMRMClient. @@ -336,4 +340,63 @@ public NMTokenCache getNMTokenCache() { return nmTokenCache; } + /** + * Wait for check to return true for each 1000 ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int)} + * and {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check + */ + public void waitFor(Supplier check) throws InterruptedException { + waitFor(check, 1000); + } + + /** + * Wait for check to return true for each + * checkEveryMillis ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check user defined checker + * @param checkEveryMillis interval to call check + */ + public void waitFor(Supplier check, int checkEveryMillis) + throws InterruptedException { + waitFor(check, checkEveryMillis, 1); + } + + /** + * Wait for check to return true for each + * checkEveryMillis ms. In the main loop, this method will log + * the message "waiting in main loop" for each logInterval times + * iteration to confirm the thread is alive. + * @param check user defined checker + * @param checkEveryMillis interval to call check + * @param logInterval interval to log for each + */ + public void waitFor(Supplier check, int checkEveryMillis, + int logInterval) throws InterruptedException { + Preconditions.checkNotNull(check, "check should not be null"); + Preconditions.checkArgument(checkEveryMillis >= 0, + "checkEveryMillis should be positive value"); + Preconditions.checkArgument(logInterval >= 0, + "logInterval should be positive value"); + + int loggingCounter = logInterval; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("Check the condition for main loop."); + } + + boolean result = check.get(); + if (result) { + LOG.info("Exits the main loop."); + return; + } + if (--loggingCounter <= 0) { + LOG.info("Waiting in main loop."); + loggingCounter = logInterval; + } + + Thread.sleep(checkEveryMillis); + } while (true); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index e726b73651..af26da1799 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.client.api.async; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @@ -90,6 +94,7 @@ @Stable public abstract class AMRMClientAsync extends AbstractService { + private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); protected final AMRMClient client; protected final CallbackHandler handler; @@ -189,6 +194,65 @@ public abstract void unregisterApplicationMaster( */ public abstract int getClusterNodeCount(); + /** + * Wait for check to return true for each 1000 ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int)} + * and {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check + */ + public void waitFor(Supplier check) throws InterruptedException { + waitFor(check, 1000); + } + + /** + * Wait for check to return true for each + * checkEveryMillis ms. + * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} + * @param check user defined checker + * @param checkEveryMillis interval to call check + */ + public void waitFor(Supplier check, int checkEveryMillis) + throws InterruptedException { + waitFor(check, checkEveryMillis, 1); + }; + + /** + * Wait for check to return true for each + * checkEveryMillis ms. In the main loop, this method will log + * the message "waiting in main loop" for each logInterval times + * iteration to confirm the thread is alive. + * @param check user defined checker + * @param checkEveryMillis interval to call check + * @param logInterval interval to log for each + */ + public void waitFor(Supplier check, int checkEveryMillis, + int logInterval) throws InterruptedException { + Preconditions.checkNotNull(check, "check should not be null"); + Preconditions.checkArgument(checkEveryMillis >= 0, + "checkEveryMillis should be positive value"); + Preconditions.checkArgument(logInterval >= 0, + "logInterval should be positive value"); + + int loggingCounter = logInterval; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("Check the condition for main loop."); + } + + boolean result = check.get(); + if (result) { + LOG.info("Exits the main loop."); + return; + } + if (--loggingCounter <= 0) { + LOG.info("Waiting in main loop."); + loggingCounter = logInterval; + } + + Thread.sleep(checkEveryMillis); + } while (true); + } + public interface CallbackHandler { /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 728a558df3..79f53fc4f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api.async.impl; +import com.google.common.base.Supplier; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; @@ -180,7 +181,7 @@ private void runHeartBeatThrowOutException(Exception ex) throws Exception{ AMRMClient client = mock(AMRMClientImpl.class); when(client.allocate(anyFloat())).thenThrow(ex); - AMRMClientAsync asyncClient = + AMRMClientAsync asyncClient = AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); @@ -228,6 +229,41 @@ public void testAMRMClientAsyncShutDown() throws Exception { asyncClient.stop(); } + @Test (timeout = 10000) + public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception { + Configuration conf = new Configuration(); + final TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + final AllocateResponse shutDownResponse = createAllocateResponse( + new ArrayList(), new ArrayList(), null); + shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); + when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + Supplier checker = new Supplier() { + @Override + public Boolean get() { + return callbackHandler.reboot; + } + }; + + asyncClient.registerApplicationMaster("localhost", 1234, null); + asyncClient.waitFor(checker); + + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); + + verify(client, times(1)).allocate(anyFloat()); + asyncClient.stop(); + } + @Test (timeout = 5000) public void testCallAMRMClientAsyncStopFromCallbackHandler() throws YarnException, IOException, InterruptedException { @@ -262,6 +298,40 @@ public void testCallAMRMClientAsyncStopFromCallbackHandler() } } + @Test (timeout = 5000) + public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor() + throws YarnException, IOException, InterruptedException { + Configuration conf = new Configuration(); + final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + List completed = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + final AllocateResponse response = createAllocateResponse(completed, + new ArrayList(), null); + + when(client.allocate(anyFloat())).thenReturn(response); + + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + callbackHandler.asynClient = asyncClient; + asyncClient.init(conf); + asyncClient.start(); + + Supplier checker = new Supplier() { + @Override + public Boolean get() { + return callbackHandler.notify; + } + }; + + asyncClient.registerApplicationMaster("localhost", 1234, null); + asyncClient.waitFor(checker); + Assert.assertTrue(checker.get()); + } + void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws InterruptedException, YarnException, IOException { Configuration conf = new Configuration(); @@ -342,7 +412,7 @@ private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler { private volatile List completedContainers; private volatile List allocatedContainers; Exception savedException = null; - boolean reboot = false; + volatile boolean reboot = false; Object notifier = new Object(); int callbackCount = 0; @@ -432,7 +502,7 @@ private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler { @SuppressWarnings("rawtypes") AMRMClientAsync asynClient; boolean stop = true; - boolean notify = false; + volatile boolean notify = false; boolean throwOutException = false; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index d7fb752bb4..38dbf79da9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api.impl; +import com.google.common.base.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -814,6 +815,40 @@ public AllocateResponse answer(InvocationOnMock invocation) assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); } + + class CountDownSupplier implements Supplier { + int counter = 0; + @Override + public Boolean get() { + counter++; + if (counter >= 3) { + return true; + } else { + return false; + } + } + }; + + @Test + public void testWaitFor() throws InterruptedException { + AMRMClientImpl amClient = null; + CountDownSupplier countDownChecker = new CountDownSupplier(); + + try { + // start am rm client + amClient = + (AMRMClientImpl) AMRMClient + . createAMRMClient(); + amClient.init(new YarnConfiguration()); + amClient.start(); + amClient.waitFor(countDownChecker, 1000); + assertEquals(3, countDownChecker.counter); + } finally { + if (amClient != null) { + amClient.stop(); + } + } + } private void sleep(int sleepTime) { try {