diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f840a9e632..8e87f4a9ef 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -104,6 +104,9 @@ Release 2.9.0 - UNRELEASED YARN-4603. FairScheduler should mention user requested queuename in error message when failed in queue ACL check. (Tao Jie via kasha) + YARN-4496. Improve HA ResourceManager Failover detection on the client. + (Jian He via xgong) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java new file mode 100644 index 0000000000..6fd6591238 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.junit.Assert; +import org.junit.Test; + + +public class TestHedgingRequestRMFailoverProxyProvider { + + @Test + public void testHedgingRequestProxyProvider() throws Exception { + final MiniYARNCluster cluster = + new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1); + Configuration conf = new YarnConfiguration(); + + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4,rm5"); + + conf.set(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, + RequestHedgingRMFailoverProxyProvider.class.getName()); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + 2000); + + HATestUtil.setRpcAddressForRM("rm1", 10000, conf); + HATestUtil.setRpcAddressForRM("rm2", 20000, conf); + HATestUtil.setRpcAddressForRM("rm3", 30000, conf); + HATestUtil.setRpcAddressForRM("rm4", 40000, conf); + HATestUtil.setRpcAddressForRM("rm5", 50000, conf); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + + cluster.init(conf); + cluster.start(); + + final YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + + // Transition rm5 to active; + long start = System.currentTimeMillis(); + makeRMActive(cluster, 4); + // client will retry until the rm becomes active. + client.getAllQueues(); + long end = System.currentTimeMillis(); + System.out.println("Client call succeeded at " + end); + // should return the response fast + Assert.assertTrue(end - start <= 10000); + + // transition rm5 to standby + cluster.getResourceManager(4).getRMContext().getRMAdminService() + .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + + makeRMActive(cluster, 2); + client.getAllQueues(); + cluster.stop(); + } + + private void makeRMActive(final MiniYARNCluster cluster, final int index) { + Thread t = new Thread() { + @Override public void run() { + try { + System.out.println("Transition rm" + index + " to active"); + cluster.getResourceManager(index).getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + t.start(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index 5577d20775..8676db244a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -45,8 +45,8 @@ public class ConfiguredRMFailoverProxyProvider private int currentProxyIndex = 0; Map proxies = new HashMap(); - private RMProxy rmProxy; - private Class protocol; + protected RMProxy rmProxy; + protected Class protocol; protected YarnConfiguration conf; protected String[] rmServiceIds; @@ -71,7 +71,7 @@ public class ConfiguredRMFailoverProxyProvider YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS)); } - private T getProxyInternal() { + protected T getProxyInternal() { try { final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); return RMProxy.getProxy(conf, protocol, rmAddress); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 3779ce5412..3ab06bd11e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -40,6 +40,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -77,6 +78,7 @@ public class RMProxy { } /** + * Currently, used by Client and AM only * Create a proxy for the specified protocol. For non-HA, * this is a direct connection to the ResourceManager address. When HA is * enabled, the proxy handles the failover between the ResourceManagers as @@ -88,12 +90,12 @@ public class RMProxy { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = - createRetryPolicy(conf); - return createRMProxy(conf, protocol, instance, retryPolicy); + RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); + return newProxyInstance(conf, protocol, instance, retryPolicy); } /** + * Currently, used by NodeManagers only. * Create a proxy for the specified protocol. For non-HA, * this is a direct connection to the ResourceManager address. When HA is * enabled, the proxy handles the failover between the ResourceManagers as @@ -106,12 +108,12 @@ public class RMProxy { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = - createRetryPolicy(conf, retryTime, retryInterval); - return createRMProxy(conf, protocol, instance, retryPolicy); + RetryPolicy retryPolicy = createRetryPolicy(conf, retryTime, retryInterval, + HAUtil.isHAEnabled(conf)); + return newProxyInstance(conf, protocol, instance, retryPolicy); } - private static T createRMProxy(final YarnConfiguration conf, + private static T newProxyInstance(final YarnConfiguration conf, final Class protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException{ if (HAUtil.isHAEnabled(conf)) { @@ -144,7 +146,7 @@ public class RMProxy { @Deprecated public static T createRMProxy(final Configuration conf, final Class protocol, InetSocketAddress rmAddress) throws IOException { - RetryPolicy retryPolicy = createRetryPolicy(conf); + RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); T proxy = RMProxy.getProxy(conf, protocol, rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); @@ -194,7 +196,8 @@ public class RMProxy { */ @Private @VisibleForTesting - public static RetryPolicy createRetryPolicy(Configuration conf) { + public static RetryPolicy createRetryPolicy(Configuration conf, + boolean isHAEnabled) { long rmConnectWaitMS = conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, @@ -204,16 +207,17 @@ public class RMProxy { YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, YarnConfiguration .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); - return createRetryPolicy( - conf, rmConnectWaitMS, rmConnectionRetryIntervalMS); + + return createRetryPolicy(conf, rmConnectWaitMS, rmConnectionRetryIntervalMS, + isHAEnabled); } /** * Fetch retry policy from Configuration and create the * retry policy with specified retryTime and retry interval. */ - private static RetryPolicy createRetryPolicy(Configuration conf, - long retryTime, long retryInterval) { + protected static RetryPolicy createRetryPolicy(Configuration conf, + long retryTime, long retryInterval, boolean isHAEnabled) { long rmConnectWaitMS = retryTime; long rmConnectionRetryIntervalMS = retryInterval; @@ -236,7 +240,7 @@ public class RMProxy { } // Handle HA case first - if (HAUtil.isHAEnabled(conf)) { + if (isHAEnabled) { final long failoverSleepBaseMs = conf.getLong( YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, rmConnectionRetryIntervalMS); @@ -287,6 +291,7 @@ public class RMProxy { exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); exceptionToPolicyMap.put(RetriableException.class, retryPolicy); exceptionToPolicyMap.put(SocketException.class, retryPolicy); + exceptionToPolicyMap.put(StandbyException.class, retryPolicy); // YARN-4288: local IOException is also possible. exceptionToPolicyMap.put(IOException.class, retryPolicy); // Not retry on remote IO exception. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java new file mode 100644 index 0000000000..dc8d19b954 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.MultiException; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * A FailoverProxyProvider implementation that technically does not "failover" + * per-se. It constructs a wrapper proxy that sends the request to ALL + * underlying proxies simultaneously. Each proxy inside the wrapper proxy will + * retry the corresponding target. It assumes the in an HA setup, there will + * be only one Active, and the active should respond faster than any configured + * standbys. Once it receives a response from any one of the configred proxies, + * outstanding requests to other proxies are immediately cancelled. + */ +public class RequestHedgingRMFailoverProxyProvider + extends ConfiguredRMFailoverProxyProvider { + + private static final Log LOG = + LogFactory.getLog(RequestHedgingRMFailoverProxyProvider.class); + + private volatile String successfulProxy = null; + private ProxyInfo wrappedProxy = null; + private Map nonRetriableProxy = new HashMap<>(); + + @Override + @SuppressWarnings("unchecked") + public void init(Configuration configuration, RMProxy rmProxy, + Class protocol) { + super.init(configuration, rmProxy, protocol); + Map> retriableProxies = new HashMap<>(); + + String originalId = HAUtil.getRMHAId(conf); + for (String rmId : rmServiceIds) { + conf.set(YarnConfiguration.RM_HA_ID, rmId); + nonRetriableProxy.put(rmId, super.getProxyInternal()); + + T proxy = createRetriableProxy(); + ProxyInfo pInfo = new ProxyInfo(proxy, rmId); + retriableProxies.put(rmId, pInfo); + } + conf.set(YarnConfiguration.RM_HA_ID, originalId); + + T proxyInstance = (T) Proxy.newProxyInstance( + RMRequestHedgingInvocationHandler.class.getClassLoader(), + new Class[] {protocol}, + new RMRequestHedgingInvocationHandler(retriableProxies)); + String combinedInfo = Arrays.toString(rmServiceIds); + wrappedProxy = new ProxyInfo(proxyInstance, combinedInfo); + LOG.info("Created wrapped proxy for " + combinedInfo); + } + + @SuppressWarnings("unchecked") + protected T createRetriableProxy() { + try { + // Create proxy that can retry exceptions properly. + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false); + InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); + T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } catch (IOException ioe) { + LOG.error("Unable to create proxy to the ResourceManager " + HAUtil + .getRMHAId(conf), ioe); + return null; + } + } + + class RMRequestHedgingInvocationHandler implements InvocationHandler { + + final private Map> allProxies; + + public RMRequestHedgingInvocationHandler( + Map> allProxies) { + this.allProxies = new HashMap<>(allProxies); + } + + protected Object invokeMethod(Object proxy, Method method, Object[] args) + throws Throwable { + try { + return method.invoke(proxy, args); + } catch (InvocationTargetException ex) { + throw ex.getCause(); + } + } + + /** + * Creates a Executor and invokes all proxies concurrently. + */ + @Override + public Object invoke(Object proxy, final Method method, + final Object[] args) throws Throwable { + if (successfulProxy != null) { + return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args); + } + + ExecutorService executor = null; + CompletionService completionService; + try { + Map, ProxyInfo> proxyMap = new HashMap<>(); + int numAttempts = 0; + executor = Executors.newFixedThreadPool(allProxies.size()); + completionService = new ExecutorCompletionService<>(executor); + for (final ProxyInfo pInfo : allProxies.values()) { + Callable c = new Callable() { + @Override public Object call() throws Exception { + return method.invoke(pInfo.proxy, args); + } + }; + proxyMap.put(completionService.submit(c), pInfo); + numAttempts++; + } + + Map badResults = new HashMap<>(); + while (numAttempts > 0) { + Future callResultFuture = completionService.take(); + String pInfo = proxyMap.get(callResultFuture).proxyInfo; + Object retVal; + try { + retVal = callResultFuture.get(); + successfulProxy = pInfo; + LOG.info("Invocation successful on [" + pInfo + "]"); + return retVal; + } catch (Exception ex) { + LOG.warn("Invocation returned exception on " + "[" + pInfo + "]"); + badResults.put(pInfo, ex); + numAttempts--; + } + } + + // At this point we should have All bad results (Exceptions) + // Or should have returned with successful result. + if (badResults.size() == 1) { + throw badResults.values().iterator().next(); + } else { + throw new MultiException(badResults); + } + } finally { + if (executor != null) { + executor.shutdownNow(); + } + } + } + } + + @Override + public ProxyInfo getProxy() { + return wrappedProxy; + } + + @Override + public void performFailover(T currentProxy) { + LOG.info("Connection lost, trying to fail over."); + successfulProxy = null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 90804b89f1..a8066c131c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -443,7 +444,8 @@ public class TestNodeStatusUpdater { @Override protected ResourceTracker getRMClient() throws IOException { - RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, + HAUtil.isHAEnabled(conf)); resourceTracker = (ResourceTracker) RetryProxy.create(ResourceTracker.class, new MyResourceTracker6(rmStartIntervalMS, rmNeverStart), @@ -476,7 +478,8 @@ public class TestNodeStatusUpdater { @Override protected ResourceTracker getRMClient() { - RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, + HAUtil.isHAEnabled(conf)); return (ResourceTracker) RetryProxy.create(ResourceTracker.class, resourceTracker, retryPolicy); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 68c9efdc32..630b7ef677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -457,6 +457,7 @@ public class MiniYARNCluster extends CompositeService { protected synchronized void serviceStart() throws Exception { startResourceManager(index); Configuration conf = resourceManagers[index].getConfig(); + LOG.info("Starting resourcemanager " + index); LOG.info("MiniYARN ResourceManager address: " + conf.get(YarnConfiguration.RM_ADDRESS)); LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils