From 5ddaf2e133d25a37b07f3a6c70362026d7e2a91c Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 18 May 2023 02:09:10 +0800 Subject: [PATCH] YARN-11493. [Federation] ConfiguredRMFailoverProxyProvider Supports Randomly Select an Router. (#5651) --- .../hadoop/yarn/conf/YarnConfiguration.java | 5 ++ .../client/TestRMFailoverProxyProvider.java | 75 ++++++++++++++++++- .../ConfiguredRMFailoverProxyProvider.java | 47 +++++++++++- .../src/main/resources/yarn-default.xml | 11 +++ 4 files changed, 136 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 88dcf49291..e8189a2b94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3966,6 +3966,11 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled"; public static final boolean DEFAULT_FEDERATION_ENABLED = false; + public static final String FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER = + FEDERATION_PREFIX + "failover.random.order"; + + public static final boolean DEFAULT_FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER = false; + public static final String FEDERATION_FAILOVER_ENABLED = FEDERATION_PREFIX + "failover.enabled"; public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java index b4fd175fae..25bd7a2200 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java @@ -31,8 +31,11 @@ import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; @@ -51,6 +54,8 @@ public class TestRMFailoverProxyProvider { private static final int RM2_PORT = 8031; private static final int RM3_PORT = 8033; + private static final int NUM_ITERATIONS = 50; + private Configuration conf; private class TestProxy extends Proxy implements Closeable { @@ -303,5 +308,73 @@ public class TestRMFailoverProxyProvider { .getProxy(any(YarnConfiguration.class), any(Class.class), eq(mockAdd3)); } + + @Test + public void testRandomSelectRouter() throws Exception { + + // We design a test case like this: + // We have three routers (router1, router2, and router3), + // we enable Federation mode and random selection mode. + // After iterating 50 times, since the selection is random, + // each router should be selected more than 0 times, + // and the sum of the number of times each router is selected should be equal to 50. + + final AtomicInteger router1Count = new AtomicInteger(0); + final AtomicInteger router2Count = new AtomicInteger(0); + final AtomicInteger router3Count = new AtomicInteger(0); + + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setBoolean(YarnConfiguration.FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER, true); + conf.set(YarnConfiguration.RM_HA_IDS, "router0,router1,router2"); + + // Create two proxies and mock a RMProxy + Proxy mockRouterProxy = new TestProxy((proxy, method, args) -> null); + + Class protocol = ApplicationClientProtocol.class; + RMProxy mockRMProxy = mock(RMProxy.class); + ConfiguredRMFailoverProxyProvider fpp = new ConfiguredRMFailoverProxyProvider<>(); + + // generate two address with different ports. + // Default port of yarn RM + InetSocketAddress mockRouterAdd = new InetSocketAddress(RM1_PORT); + + // Mock RMProxy methods + when(mockRMProxy.getRMAddress(any(YarnConfiguration.class), + any(Class.class))).thenReturn(mockRouterAdd); + when(mockRMProxy.getProxy(any(YarnConfiguration.class), + any(Class.class), eq(mockRouterAdd))).thenReturn(mockRouterProxy); + + // Initialize failover proxy provider and get proxy from it. + for (int i = 0; i < NUM_ITERATIONS; i++) { + fpp.init(conf, mockRMProxy, protocol); + FailoverProxyProvider.ProxyInfo proxy = fpp.getProxy(); + if ("router0".equals(proxy.proxyInfo)) { + router1Count.incrementAndGet(); + } + if ("router1".equals(proxy.proxyInfo)) { + router2Count.incrementAndGet(); + } + if ("router2".equals(proxy.proxyInfo)) { + router3Count.incrementAndGet(); + } + } + + // router1Count、router2Count、router3Count are + // less than NUM_ITERATIONS + assertTrue(router1Count.get() < NUM_ITERATIONS); + assertTrue(router2Count.get() < NUM_ITERATIONS); + assertTrue(router3Count.get() < NUM_ITERATIONS); + + // router1Count、router2Count、router3Count are + // more than NUM_ITERATIONS + assertTrue(router1Count.get() > 0); + assertTrue(router2Count.get() > 0); + assertTrue(router3Count.get() > 0); + + // totals(router1Count+router2Count+router3Count ) should be equal NUM_ITERATIONS + int totalCount = router1Count.get() + router2Count.get() + router3Count.get(); + assertEquals(NUM_ITERATIONS, totalCount); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index 89c8753868..3deb49299f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -21,9 +21,12 @@ package org.apache.hadoop.yarn.client; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +60,7 @@ public class ConfiguredRMFailoverProxyProvider this.protocol = protocol; this.rmProxy.checkAllowedProtocols(this.protocol); this.conf = new YarnConfiguration(configuration); - Collection rmIds = HAUtil.getRMHAIds(conf); + Collection rmIds = getRMIds(conf); this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); @@ -119,4 +122,46 @@ public class ConfiguredRMFailoverProxyProvider } } } + + /** + * Get the list of RM IDs. + * + * @param pConfiguration Configuration. + * @return rmId. + */ + private Collection getRMIds(Configuration pConfiguration) { + boolean isFederationEnabled = HAUtil.isFederationEnabled(pConfiguration); + if (!isFederationEnabled) { + return HAUtil.getRMHAIds(pConfiguration); + } + return getRandomOrderByRandomFlag(pConfiguration); + } + + /** + * YARN Federation mode, the Router is considered as an RM for the client. + * We want the client to be able to randomly + * Select a Router and support failover when selecting a Router. + * The original code always started trying from the first + * Router when the client selected a Router, + * but this method will support random Router selection. + * + * For clusters that have not enabled Federation mode, the behavior remains unchanged. + * + * @param pConfiguration Configuration. + * @return rmIds + */ + private Collection getRandomOrderByRandomFlag(Configuration pConfiguration) { + Collection rmIds = HAUtil.getRMHAIds(pConfiguration); + boolean isRandomOrder = pConfiguration.getBoolean( + YarnConfiguration.FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER, + YarnConfiguration.DEFAULT_FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER); + // If the Random option is not enabled, returns the configured array. + if (!isRandomOrder) { + return rmIds; + } + // If the Random option is enabled, returns an array of Random. + List rmIdList = new ArrayList<>(rmIds); + Collections.shuffle(rmIdList); + return rmIdList; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 52f8a124cb..5e11c6526e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5250,4 +5250,15 @@ org.apache.hadoop.yarn.server.federation.cache.FederationJCache + + + After enabling YARN Federation mode, + clients can be allowed to randomly choose a Router and support FailOver. + By default, the configuration is set to false, + and clients start attempting from the first Router. + + yarn.federation.failover.random.order + false + +