diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f0fcbbe198..396c82b26b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -334,6 +334,9 @@ Release 2.4.0 - UNRELEASED YARN-1573. ZK store should use a private password for root-node-acls. (kasha). + YARN-1630. Introduce timeout for async polling operations in YarnClientImpl + (Aditya Acharya via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ea9b93aae1..32665d79bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1020,6 +1020,17 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "client.application-client-protocol.poll-interval-ms"; public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS = 200; + + /** + * The duration that the yarn client library waits, cumulatively across polls, + * for an expected state change to occur. Defaults to -1, which indicates no + * limit. + */ + public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS = + YARN_PREFIX + "client.application-client-protocol.poll-timeout-ms"; + public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS = + -1; + /** * Max number of threads in NMClientAsync to process container management * events diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 2e75433578..ac80240d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -86,6 +86,7 @@ public class YarnClientImpl extends YarnClient { protected ApplicationClientProtocol rmClient; protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; + private long asyncApiPollTimeoutMillis; protected AHSClient historyClient; private boolean historyServiceEnabled; @@ -101,6 +102,9 @@ protected void serviceInit(Configuration conf) throws Exception { asyncApiPollIntervalMillis = conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + asyncApiPollTimeoutMillis = + conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS); submitPollIntervalMillis = asyncApiPollIntervalMillis; if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS) != null) { @@ -174,13 +178,24 @@ public YarnClientApplication createApplication() rmClient.submitApplication(request); int pollCount = 0; + long startTime = System.currentTimeMillis(); + while (true) { YarnApplicationState state = getApplicationReport(applicationId).getYarnApplicationState(); if (!state.equals(YarnApplicationState.NEW) && !state.equals(YarnApplicationState.NEW_SAVING)) { + LOG.info("Submitted application " + applicationId); break; } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() && + elapsedMillis >= asyncApiPollTimeoutMillis) { + throw new YarnException("Timed out while waiting for application " + + applicationId + " to be submitted successfully"); + } + // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { @@ -191,10 +206,11 @@ public YarnClientApplication createApplication() try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for application " + applicationId + + " to be successfully submitted."); } } - LOG.info("Submitted application " + applicationId); return applicationId; } @@ -207,15 +223,25 @@ public void killApplication(ApplicationId applicationId) try { int pollCount = 0; + long startTime = System.currentTimeMillis(); + while (true) { KillApplicationResponse response = rmClient.forceKillApplication(request); if (response.getIsKillCompleted()) { + LOG.info("Killed application " + applicationId); break; } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() && + elapsedMillis >= this.asyncApiPollTimeoutMillis) { + throw new YarnException("Timed out while waiting for application " + + applicationId + " to be killed."); + } + if (++pollCount % 10 == 0) { - LOG.info("Watiting for application " + applicationId - + " to be killed."); + LOG.info("Waiting for application " + applicationId + " to be killed."); } Thread.sleep(asyncApiPollIntervalMillis); } @@ -223,7 +249,11 @@ public void killApplication(ApplicationId applicationId) LOG.error("Interrupted while waiting for application " + applicationId + " to be killed."); } - LOG.info("Killed application " + applicationId); + } + + @VisibleForTesting + boolean enforceAsyncAPITimeout() { + return asyncApiPollTimeoutMillis >= 0; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index dc6d98e692..7c3496656b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api.impl; +import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -35,6 +36,7 @@ import junit.framework.Assert; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -474,4 +476,30 @@ private void waitTillAccepted(YarnClient rmClient, ApplicationId appId) } } + @Test + public void testAsyncAPIPollTimeout() { + testAsyncAPIPollTimeoutHelper(null, false); + testAsyncAPIPollTimeoutHelper(0L, true); + testAsyncAPIPollTimeoutHelper(1L, true); + } + + private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout, + boolean expectedTimeoutEnforcement) { + YarnClientImpl client = new YarnClientImpl(); + try { + Configuration conf = new Configuration(); + if (valueForTimeout != null) { + conf.setLong( + YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, + valueForTimeout); + } + + client.init(conf); + + Assert.assertEquals( + expectedTimeoutEnforcement, client.enforceAsyncAPITimeout()); + } finally { + IOUtils.closeQuietly(client); + } + } }