YARN-1630. Introduce timeout for async polling operations in YarnClientImpl (Aditya Acharya via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1562289 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-28 22:27:46 +00:00
parent 2a20fe8370
commit 3f79e49624
4 changed files with 76 additions and 4 deletions

View File

@ -334,6 +334,9 @@ Release 2.4.0 - UNRELEASED
YARN-1573. ZK store should use a private password for root-node-acls.
(kasha).
YARN-1630. Introduce timeout for async polling operations in YarnClientImpl
(Aditya Acharya via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -1020,6 +1020,17 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "client.application-client-protocol.poll-interval-ms";
public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
200;
/**
* The duration that the yarn client library waits, cumulatively across polls,
* for an expected state change to occur. Defaults to -1, which indicates no
* limit.
*/
public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS =
YARN_PREFIX + "client.application-client-protocol.poll-timeout-ms";
public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS =
-1;
/**
* Max number of threads in NMClientAsync to process container management
* events

View File

@ -86,6 +86,7 @@ public class YarnClientImpl extends YarnClient {
protected ApplicationClientProtocol rmClient;
protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis;
private long asyncApiPollTimeoutMillis;
protected AHSClient historyClient;
private boolean historyServiceEnabled;
@ -101,6 +102,9 @@ protected void serviceInit(Configuration conf) throws Exception {
asyncApiPollIntervalMillis =
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
asyncApiPollTimeoutMillis =
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS);
submitPollIntervalMillis = asyncApiPollIntervalMillis;
if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
!= null) {
@ -174,13 +178,24 @@ public YarnClientApplication createApplication()
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
@ -191,10 +206,11 @@ public YarnClientApplication createApplication()
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application " + applicationId
+ " to be successfully submitted.");
}
}
LOG.info("Submitted application " + applicationId);
return applicationId;
}
@ -207,15 +223,25 @@ public void killApplication(ApplicationId applicationId)
try {
int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) {
KillApplicationResponse response =
rmClient.forceKillApplication(request);
if (response.getIsKillCompleted()) {
LOG.info("Killed application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be killed.");
}
if (++pollCount % 10 == 0) {
LOG.info("Watiting for application " + applicationId
+ " to be killed.");
LOG.info("Waiting for application " + applicationId + " to be killed.");
}
Thread.sleep(asyncApiPollIntervalMillis);
}
@ -223,7 +249,11 @@ public void killApplication(ApplicationId applicationId)
LOG.error("Interrupted while waiting for application " + applicationId
+ " to be killed.");
}
LOG.info("Killed application " + applicationId);
}
@VisibleForTesting
boolean enforceAsyncAPITimeout() {
return asyncApiPollTimeoutMillis >= 0;
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -35,6 +36,7 @@
import junit.framework.Assert;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@ -474,4 +476,30 @@ private void waitTillAccepted(YarnClient rmClient, ApplicationId appId)
}
}
@Test
public void testAsyncAPIPollTimeout() {
testAsyncAPIPollTimeoutHelper(null, false);
testAsyncAPIPollTimeoutHelper(0L, true);
testAsyncAPIPollTimeoutHelper(1L, true);
}
private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout,
boolean expectedTimeoutEnforcement) {
YarnClientImpl client = new YarnClientImpl();
try {
Configuration conf = new Configuration();
if (valueForTimeout != null) {
conf.setLong(
YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
valueForTimeout);
}
client.init(conf);
Assert.assertEquals(
expectedTimeoutEnforcement, client.enforceAsyncAPITimeout());
} finally {
IOUtils.closeQuietly(client);
}
}
}