From a1ee2145cf4769ffb4d47f775b64e4a91f29d73f Mon Sep 17 00:00:00 2001
From: Bikas Saha
Date: Tue, 23 Apr 2013 03:06:41 +0000
Subject: [PATCH] YARN-549. YarnClient.submitApplication should wait for
application to be accepted by the RM (Zhijie Shen via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470797 13f79535-47bb-0310-9956-ffa450edef68
---
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/api/ClientRMProtocol.java | 4 +-
.../apache/hadoop/yarn/client/YarnClient.java | 4 +-
.../hadoop/yarn/client/YarnClientImpl.java | 29 ++++++
.../hadoop/yarn/client/TestYarnClient.java | 91 ++++++++++++++++++-
.../hadoop/yarn/conf/YarnConfiguration.java | 13 +++
.../src/main/resources/yarn-default.xml | 8 ++
7 files changed, 148 insertions(+), 4 deletions(-)
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8a1d076455..cdd10fcfa7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -277,6 +277,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-594. Update test and add comments in YARN-534 (Jian He via bikas)
+ YARN-549. YarnClient.submitApplication should wait for application to be
+ accepted by the RM (Zhijie Shen via bikas)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
index 8a19dad256..bfb4fce563 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
@@ -96,7 +96,9 @@ public GetNewApplicationResponse getNewApplication(
*
* Currently the ResourceManager
sends an immediate (empty)
* {@link SubmitApplicationResponse} on accepting the submission and throws
- * an exception if it rejects the submission.
+ * an exception if it rejects the submission. However, this call needs to be
+ * followed by {@link #getApplicationReport(GetApplicationReportRequest)}
+ * to make sure that the application gets properly submitted.
*
* In secure mode,the ResourceManager
verifies access to
* queues etc. before accepting the application submission.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
index 20235d065e..afd21d096a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
@@ -63,7 +63,9 @@ public interface YarnClient extends Service {
/**
*
- * Submit a new application to YARN.
+ * Submit a new application to YARN.
It is a blocking call, such
+ * that it will not return {@link ApplicationId} until the submitted
+ * application has been submitted and accepted by the ResourceManager.
*
*
* @param appContext
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
index eb84b31c79..6be9052994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
@@ -53,9 +53,11 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
@@ -68,6 +70,7 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
protected ClientRMProtocol rmClient;
protected InetSocketAddress rmAddress;
+ protected long statePollIntervalMillis;
private static final String ROOT = "root";
@@ -90,6 +93,9 @@ public synchronized void init(Configuration conf) {
if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf);
}
+ statePollIntervalMillis = conf.getLong(
+ YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
super.init(conf);
}
@@ -131,6 +137,29 @@ public GetNewApplicationResponse getNewApplication()
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
rmClient.submitApplication(request);
+
+ int pollCount = 0;
+ while (true) {
+ YarnApplicationState state =
+ getApplicationReport(applicationId).getYarnApplicationState();
+ if (!state.equals(YarnApplicationState.NEW) &&
+ !state.equals(YarnApplicationState.NEW_SAVING)) {
+ break;
+ }
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Application submission is not finished, " +
+ "submitted application " + applicationId +
+ " is still in " + state);
+ }
+ try {
+ Thread.sleep(statePollIntervalMillis);
+ } catch (InterruptedException ie) {
+ }
+ }
+
+
LOG.info("Submitted application " + applicationId + " to ResourceManager"
+ " at " + rmAddress);
return applicationId;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
index 3d7f1201f5..ccfc8d9956 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
@@ -18,10 +18,25 @@
package org.apache.hadoop.yarn.client;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import junit.framework.Assert;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestYarnClient {
@@ -43,4 +58,76 @@ public void testClientStop() {
client.start();
client.stop();
}
+
+ @Test (timeout = 30000)
+ public void testSubmitApplication() {
+ Configuration conf = new Configuration();
+ conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+ 100); // speed up tests
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+
+ YarnApplicationState[] exitStates = new YarnApplicationState[]
+ {
+ YarnApplicationState.SUBMITTED,
+ YarnApplicationState.ACCEPTED,
+ YarnApplicationState.RUNNING,
+ YarnApplicationState.FINISHED,
+ YarnApplicationState.FAILED,
+ YarnApplicationState.KILLED
+ };
+ for (int i = 0; i < exitStates.length; ++i) {
+ ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ ApplicationId applicationId = Records.newRecord(ApplicationId.class);
+ applicationId.setClusterTimestamp(System.currentTimeMillis());
+ applicationId.setId(i);
+ when(context.getApplicationId()).thenReturn(applicationId);
+ ((MockYarnClient) client).setYarnApplicationState(exitStates[i]);
+ try {
+ client.submitApplication(context);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ verify(((MockYarnClient) client).mockReport,times(4 * i + 4))
+ .getYarnApplicationState();
+ }
+
+ client.stop();
+ }
+
+ private static class MockYarnClient extends YarnClientImpl {
+ private ApplicationReport mockReport;
+
+ public MockYarnClient() {
+ super();
+ }
+
+ @Override
+ public void start() {
+ rmClient = mock(ClientRMProtocol.class);
+ GetApplicationReportResponse mockResponse =
+ mock(GetApplicationReportResponse.class);
+ mockReport = mock(ApplicationReport.class);
+ try{
+ when(rmClient.getApplicationReport(any(
+ GetApplicationReportRequest.class))).thenReturn(mockResponse);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public void setYarnApplicationState(YarnApplicationState state) {
+ when(mockReport.getYarnApplicationState()).thenReturn(
+ YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.NEW_SAVING, state);
+ }
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1ff85eddc9..f9b017de42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -692,6 +692,19 @@ public class YarnConfiguration extends Configuration {
*/
public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+ ////////////////////////////////
+ // Other Configs
+ ////////////////////////////////
+
+ /**
+ * The interval of the yarn client's querying application state after
+ * application submission. The unit is millisecond.
+ */
+ public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
+ YARN_PREFIX + "client.app-submission.poll-interval";
+ public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
+ 1000;
+
public YarnConfiguration() {
super();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 15775bed55..599f8a9edd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -708,4 +708,12 @@
$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*
+
+
+ The interval of the yarn client's querying application state
+ after application submission. The unit is millisecond.
+ yarn.client.app-submission.poll-interval
+ 1000
+
+