From af78fd729c3b847c447d4a8edd758fb0c9b25b02 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 19 Sep 2013 22:35:12 +0000 Subject: [PATCH] MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524856 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../dev-support/findbugs-exclude.xml | 6 +++ .../apache/hadoop/mapreduce/MRJobConfig.java | 2 +- .../src/main/resources/mapred-default.xml | 4 +- .../hadoop/mapred/ClientServiceDelegate.java | 45 +++++++++++++++---- .../mapred/TestClientServiceDelegate.java | 42 +++++++++++++++++ 6 files changed, 91 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 63b24f55dc..70ff15eb64 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -193,6 +193,9 @@ Release 2.2.0 - UNRELEASED MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta via tgraves) + MAPREDUCE-5488. Changed MR client to keep trying to reach the application + when it sees that on attempt's AM is down. (Jian He via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml index 25fada378a..11d4643066 100644 --- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml @@ -496,6 +496,12 @@ + + + + + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 85f6b96e29..24c3399449 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -357,7 +357,7 @@ public interface MRJobConfig { public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3; /** - * The number of client retries to the RM/HS/AM before throwing exception. + * The number of client retries to the RM/HS before throwing exception. */ public static final String MR_CLIENT_MAX_RETRIES = MR_PREFIX + "client.max-retries"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index cf28e4d288..db63f4511f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -982,7 +982,7 @@ yarn.app.mapreduce.client-am.ipc.max-retries - 1 + 3 The number of client retries to the AM - before reconnecting to the RM to fetch Application Status. @@ -990,7 +990,7 @@ yarn.app.mapreduce.client.max-retries 3 - The number of client retries to the RM/HS/AM before + The number of client retries to the RM/HS before throwing exception. This is a layer above the ipc. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 77a0ad2b8c..54c5b60886 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; + public class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); private static final String UNAVAILABLE = "N/A"; @@ -93,7 +96,8 @@ public class ClientServiceDelegate { private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static String UNKNOWN_USER = "Unknown User"; private String trackingUrl; - + private AtomicBoolean usingAMProxy = new AtomicBoolean(false); + private int maxClientRetry; private boolean amAclDisabledStatusLogged = false; public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, @@ -287,6 +291,7 @@ MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr) MRClientProtocol proxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, serviceAddr, conf); + usingAMProxy.set(true); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); return proxy; } @@ -301,13 +306,15 @@ private synchronized Object invoke(String method, Class argClass, } catch (NoSuchMethodException e) { throw new YarnRuntimeException("Method name mismatch", e); } - int maxRetries = this.conf.getInt( + maxClientRetry = this.conf.getInt( MRJobConfig.MR_CLIENT_MAX_RETRIES, MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES); IOException lastException = null; - while (maxRetries > 0) { + while (maxClientRetry > 0) { + MRClientProtocol MRClientProxy = null; try { - return methodOb.invoke(getProxy(), args); + MRClientProxy = getProxy(); + return methodOb.invoke(MRClientProxy, args); } catch (InvocationTargetException e) { // Will not throw out YarnException anymore LOG.debug("Failed to contact AM/History for job " + jobId + @@ -315,22 +322,44 @@ private synchronized Object invoke(String method, Class argClass, // Force reconnection by setting the proxy to null. realProxy = null; // HS/AMS shut down - maxRetries--; + // if it's AM shut down, do not decrement maxClientRetry as we wait for + // AM to be restarted. + if (!usingAMProxy.get()) { + maxClientRetry--; + } + usingAMProxy.set(false); lastException = new IOException(e.getTargetException()); - + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("ClientServiceDelegate invoke call interrupted", ie); + throw new YarnRuntimeException(ie); + } } catch (Exception e) { LOG.debug("Failed to contact AM/History for job " + jobId + " Will retry..", e); // Force reconnection by setting the proxy to null. realProxy = null; // RM shutdown - maxRetries--; - lastException = new IOException(e.getMessage()); + maxClientRetry--; + lastException = new IOException(e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("ClientServiceDelegate invoke call interrupted", ie); + throw new YarnRuntimeException(ie); + } } } throw lastException; } + // Only for testing + @VisibleForTesting + public int getMaxClientRetry() { + return this.maxClientRetry; + } + public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index 7889774e48..7fbd2abab7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -140,6 +140,48 @@ public void testRetriesOnConnectionFailure() throws Exception { any(GetJobReportRequest.class)); } + @Test + public void testRetriesOnAMConnectionFailures() throws Exception { + if (!isAMReachableFromClient) { + return; + } + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(getRunningApplicationReport("am1", 78)); + + // throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and + // succeed in the 5th call. + final MRClientProtocol amProxy = mock(MRClientProtocol.class); + when(amProxy.getJobReport(any(GetJobReportRequest.class))) + .thenThrow(new RuntimeException("11")) + .thenThrow(new RuntimeException("22")) + .thenThrow(new RuntimeException("33")) + .thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse()); + Configuration conf = new YarnConfiguration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, + !isAMReachableFromClient); + ClientServiceDelegate clientServiceDelegate = + new ClientServiceDelegate(conf, rm, oldJobId, null) { + @Override + MRClientProtocol instantiateAMProxy( + final InetSocketAddress serviceAddr) throws IOException { + super.instantiateAMProxy(serviceAddr); + return amProxy; + } + }; + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + + Assert.assertNotNull(jobStatus); + // assert maxClientRetry is not decremented. + Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, + MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate + .getMaxClientRetry()); + verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class)); + } + @Test public void testHistoryServerNotConfigured() throws Exception { //RM doesn't have app report and job History Server is not configured