diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ce94a7519e..48c147be01 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -367,6 +367,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4160. some mrv1 ant tests fail with timeout - due to 4156
(tgraves)
+ MAPREDUCE-4074. Client continuously retries to RM When RM goes down
+ before launching Application Master (xieguiming via tgraves)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 50b6eed7ce..8a6880af2f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -325,6 +325,13 @@ public interface MRJobConfig {
MR_PREFIX + "client-am.ipc.max-retries";
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
+ /**
+ * The number of client retries to the RM/HS/AM before throwing exception.
+ */
+ public static final String MR_CLIENT_MAX_RETRIES =
+ MR_PREFIX + "client.max-retries";
+ public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+
/** The staging directory for map reduce.*/
public static final String MR_AM_STAGING_DIR =
MR_AM_PREFIX+"staging-dir";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 5c533e6f26..b62259039a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1250,6 +1250,13 @@
to the RM to fetch Application Status.
+
+ yarn.app.mapreduce.client.max-retries
+ 3
+ The number of client retries to the RM/HS/AM before
+ throwing exception. This is a layer above the ipc.
+
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
index b2923ab124..c2a373750c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
@@ -282,7 +282,7 @@ MRClientProtocol instantiateAMProxy(final String serviceAddr)
}
private synchronized Object invoke(String method, Class argClass,
- Object args) throws YarnRemoteException {
+ Object args) throws IOException {
Method methodOb = null;
try {
methodOb = MRClientProtocol.class.getMethod(method, argClass);
@@ -291,7 +291,11 @@ private synchronized Object invoke(String method, Class argClass,
} catch (NoSuchMethodException e) {
throw new YarnException("Method name mismatch", e);
}
- while (true) {
+ int maxRetries = this.conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ IOException lastException = null;
+ while (maxRetries > 0) {
try {
return methodOb.invoke(getProxy(), args);
} catch (YarnRemoteException yre) {
@@ -308,13 +312,21 @@ private synchronized Object invoke(String method, Class argClass,
" retrying..", e.getTargetException());
// Force reconnection by setting the proxy to null.
realProxy = null;
+ // HS/AMS shut down
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+
} catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e);
// Force reconnection by setting the proxy to null.
realProxy = null;
+ // RM shutdown
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
}
}
+ throw lastException;
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
@@ -364,7 +376,7 @@ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg
return result;
}
- public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+ public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
@@ -390,7 +402,7 @@ public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
- throws YarnRemoteException, YarnRemoteException {
+ throws IOException{
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetTaskReportsRequest request =
@@ -407,7 +419,7 @@ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, T
}
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
- throws YarnRemoteException {
+ throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
if (fail) {
@@ -423,7 +435,7 @@ public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
}
public boolean killJob(JobID oldJobID)
- throws YarnRemoteException {
+ throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
index 9289ca7a56..55cfeeb944 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -122,8 +123,7 @@ public void testRetriesOnConnectionFailure() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
- new RuntimeException("1")).thenThrow(new RuntimeException("2"))
- .thenThrow(new RuntimeException("3"))
+ new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
@@ -135,7 +135,7 @@ public void testRetriesOnConnectionFailure() throws Exception {
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
- verify(historyServerProxy, times(4)).getJobReport(
+ verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class));
}
@@ -312,6 +312,74 @@ public void testAMAccessDisabled() throws IOException {
any(String.class));
}
+ @Test
+ public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
+ Configuration conf = new YarnConfiguration();
+ testRMDownForJobStatusBeforeGetAMReport(conf,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ }
+
+ @Test
+ public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
+ throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
+ testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
+ }
+
+ @Test
+ public void testRMDownRestoreForJobStatusBeforeGetAMReport()
+ throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
+
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
+ .thenReturn(getJobReportResponse());
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced1"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rmDelegate, oldJobId, historyServerProxy);
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
+ Assert.assertNotNull(jobStatus);
+ }
+
+ private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
+ int noOfRetries) throws YarnRemoteException {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced1"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced2"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced3")));
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rmDelegate, oldJobId, historyServerProxy);
+ try {
+ clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.fail("It should throw exception after retries");
+ } catch (IOException e) {
+ System.out.println("fail to get job status,and e=" + e.toString());
+ }
+ verify(rmDelegate, times(noOfRetries)).getApplicationReport(
+ any(ApplicationId.class));
+ }
+
private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId);