From 37269261d1232bc71708f30c76193188258ef4bd Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Wed, 2 May 2018 14:49:39 +0800 Subject: [PATCH] HDFS-13488. RBF: Reject requests when a Router is overloaded. Contributed by Inigo Goiri. --- .../metrics/FederationRPCMBean.java | 2 + .../metrics/FederationRPCMetrics.java | 10 + .../FederationRPCPerformanceMonitor.java | 5 + .../federation/router/RBFConfigKeys.java | 3 + .../federation/router/RouterRpcClient.java | 31 ++- .../federation/router/RouterRpcMonitor.java | 6 + .../federation/router/RouterRpcServer.java | 11 +- .../router/RouterSafeModeException.java | 53 ---- .../src/main/resources/hdfs-rbf-default.xml | 9 + .../federation/FederationTestUtils.java | 2 +- .../federation/StateStoreDFSCluster.java | 28 ++ .../TestRouterClientRejectOverload.java | 243 ++++++++++++++++++ .../router/TestRouterRPCClientRetries.java | 51 +--- .../federation/router/TestRouterSafemode.java | 3 +- 14 files changed, 349 insertions(+), 108 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 3e031fefec..973c3983f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -40,6 +40,8 @@ public interface FederationRPCMBean { long getProxyOpFailureStandby(); + long getProxyOpFailureClientOverloaded(); + long getProxyOpNotImplemented(); long getProxyOpRetries(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 94d3383d10..9ab4e5addb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -54,6 +54,8 @@ public class FederationRPCMetrics implements FederationRPCMBean { private MutableCounterLong proxyOpFailureStandby; @Metric("Number of operations to hit a standby NN") private MutableCounterLong proxyOpFailureCommunicate; + @Metric("Number of operations to hit a client overloaded Router") + private MutableCounterLong proxyOpFailureClientOverloaded; @Metric("Number of operations not implemented") private MutableCounterLong proxyOpNotImplemented; @Metric("Number of operation retries") @@ -118,6 +120,14 @@ public long getProxyOpFailureCommunicate() { return proxyOpFailureCommunicate.value(); } + public void incrProxyOpFailureClientOverloaded() { + proxyOpFailureClientOverloaded.incr(); + } + + @Override + public long getProxyOpFailureClientOverloaded() { + return proxyOpFailureClientOverloaded.value(); + } public void incrProxyOpNotImplemented() { proxyOpNotImplemented.incr(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index 547ebb567d..2c2741e559 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -153,6 +153,11 @@ public void proxyOpFailureCommunicate() { metrics.incrProxyOpFailureCommunicate(); } + @Override + public void proxyOpFailureClientOverloaded() { + metrics.incrProxyOpFailureClientOverloaded(); + } + @Override public void proxyOpNotImplemented() { metrics.incrProxyOpNotImplemented(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 170b87601d..363db20805 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -113,6 +113,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS = FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts"; public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3; + public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD = + FEDERATION_ROUTER_PREFIX + "client.reject.overload"; + public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false; // HDFS Router State Store connection public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 513e867f06..3eb7241428 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -35,13 +35,16 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -98,7 +101,7 @@ public class RouterRpcClient { /** Connection pool to the Namenodes per user for performance. */ private final ConnectionManager connectionManager; /** Service to run asynchronous calls. */ - private final ExecutorService executorService; + private final ThreadPoolExecutor executorService; /** Retry policy for router -> NN communication. */ private final RetryPolicy retryPolicy; /** Optional perf monitor. */ @@ -131,8 +134,16 @@ public RouterRpcClient(Configuration conf, String identifier, ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RPC Router Client-%d") .build(); - this.executorService = Executors.newFixedThreadPool( - numThreads, threadFactory); + BlockingQueue workQueue; + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, + RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) { + workQueue = new ArrayBlockingQueue<>(numThreads); + } else { + workQueue = new LinkedBlockingQueue<>(); + } + this.executorService = new ThreadPoolExecutor(numThreads, numThreads, + 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); this.rpcMonitor = monitor; @@ -1106,6 +1117,16 @@ public Object call() throws Exception { } return results; + } catch (RejectedExecutionException e) { + if (rpcMonitor != null) { + rpcMonitor.proxyOpFailureClientOverloaded(); + } + int active = executorService.getActiveCount(); + int total = executorService.getMaximumPoolSize(); + String msg = "Not enough client threads " + active + "/" + total; + LOG.error(msg); + throw new StandbyException( + "Router " + routerId + " is overloaded: " + msg); } catch (InterruptedException ex) { LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); throw new IOException( diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index df9aa11159..7af71af079 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -75,6 +75,12 @@ void init( */ void proxyOpFailureCommunicate(); + /** + * Failed to proxy an operation to a Namenode because the client was + * overloaded. + */ + void proxyOpFailureClientOverloaded(); + /** * Failed to proxy an operation because it is not implemented. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 21f26d0938..6b466b8694 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -289,7 +289,6 @@ public RouterRpcServer(Configuration configuration, Router router, // We don't want the server to log the full stack trace for some exceptions this.rpcServer.addTerseExceptions( RemoteException.class, - StandbyException.class, SafeModeException.class, FileNotFoundException.class, FileAlreadyExistsException.class, @@ -298,6 +297,9 @@ public RouterRpcServer(Configuration configuration, Router router, NotReplicatedYetException.class, IOException.class); + this.rpcServer.addSuppressedLoggingExceptions( + StandbyException.class); + // The RPC-server port can be ephemeral... ensure we have the correct info InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); this.rpcAddress = new InetSocketAddress( @@ -413,7 +415,7 @@ public InetSocketAddress getRpcAddress() { * @throws UnsupportedOperationException If the operation is not supported. */ protected void checkOperation(OperationCategory op, boolean supported) - throws RouterSafeModeException, UnsupportedOperationException { + throws StandbyException, UnsupportedOperationException { checkOperation(op); if (!supported) { @@ -435,7 +437,7 @@ protected void checkOperation(OperationCategory op, boolean supported) * client requests. */ protected void checkOperation(OperationCategory op) - throws RouterSafeModeException { + throws StandbyException { // Log the function we are currently calling. if (rpcMonitor != null) { rpcMonitor.startOp(); @@ -459,7 +461,8 @@ protected void checkOperation(OperationCategory op) if (rpcMonitor != null) { rpcMonitor.routerFailureSafemode(); } - throw new RouterSafeModeException(router.getRouterId(), op); + throw new StandbyException("Router " + router.getRouterId() + + " is in safe mode and cannot handle " + op + " requests"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java deleted file mode 100644 index 7a78b5b733..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; -import org.apache.hadoop.ipc.StandbyException; - -/** - * Exception that the Router throws when it is in safe mode. This extends - * {@link StandbyException} for the client to try another Router when it gets - * this exception. - */ -public class RouterSafeModeException extends StandbyException { - - private static final long serialVersionUID = 453568188334993493L; - - /** Identifier of the Router that generated this exception. */ - private final String routerId; - - /** - * Build a new Router safe mode exception. - * @param router Identifier of the Router. - * @param op Category of the operation (READ/WRITE). - */ - public RouterSafeModeException(String router, OperationCategory op) { - super("Router " + router + " is in safe mode and cannot handle " + op - + " requests."); - this.routerId = router; - } - - /** - * Get the id of the Router that generated this exception. - * @return Id of the Router that generated this exception. - */ - public String getRouterId() { - return this.routerId; - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 92f899d5ec..8806cb27de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -431,4 +431,13 @@ + + dfs.federation.router.client.reject.overload + false + + Set to true to reject client requests when we run out of RPC client + threads. + + + \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index ed1428a947..ce320f462b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.Whitebox; +import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java index bf63b18469..9d56f130c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java @@ -28,6 +28,10 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; @@ -37,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.util.StringUtils; /** * Test utility to mimic a federated HDFS cluster with a router and a state @@ -145,4 +150,27 @@ public List generateMockMountTable() throws IOException { entries.add(entry); return entries; } + + /** + * Get the client configuration which targets all the Routers. It uses the HA + * setup to fails over between them. + * @return Configuration for the client which uses two routers. + */ + public Configuration getRouterClientConf() { + List routers = getRouters(); + Configuration clientConf = DFSTestUtil.newHAConfiguration("fed"); + int i = 0; + List names = new ArrayList<>(routers.size()); + for (RouterContext routerContext : routers) { + String name = "r" + i++; + clientConf.set( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name, + "localhost:" + routerContext.getRpcPort()); + names.add(name); + } + clientConf.set(DFSUtil.addKeySuffixes( + HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"), + StringUtils.join(",", names)); + return clientConf; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java new file mode 100644 index 0000000000..3c51e13182 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test the Router overload control which rejects requests when the RPC client + * is overloaded. This feature is managed by + * {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}. + */ +public class TestRouterClientRejectOverload { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterClientRejectOverload.class); + + private StateStoreDFSCluster cluster; + + @After + public void cleanup() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void setupCluster(boolean overloadControl) throws Exception { + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .metrics() + .admin() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4); + // Overload control + routerConf.setBoolean( + RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl); + + // No need for datanodes as we use renewLease() for testing + cluster.setNumDatanodesPerNameservice(0); + + cluster.addRouterOverrides(routerConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + } + + @Test + public void testWithoutOverloadControl() throws Exception { + setupCluster(false); + + // Nobody should get overloaded + testOverloaded(0); + + // Set subcluster 0 as slow + MiniDFSCluster dfsCluster = cluster.getCluster(); + NameNode nn0 = dfsCluster.getNameNode(0); + simulateSlowNamenode(nn0, 1); + + // Nobody should get overloaded, but it will be really slow + testOverloaded(0); + + // No rejected requests expected + for (RouterContext router : cluster.getRouters()) { + FederationRPCMetrics rpcMetrics = + router.getRouter().getRpcServer().getRPCMetrics(); + assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded()); + } + } + + @Test + public void testOverloadControl() throws Exception { + setupCluster(true); + + List routers = cluster.getRouters(); + FederationRPCMetrics rpcMetrics0 = + routers.get(0).getRouter().getRpcServer().getRPCMetrics(); + FederationRPCMetrics rpcMetrics1 = + routers.get(1).getRouter().getRpcServer().getRPCMetrics(); + + // Nobody should get overloaded + testOverloaded(0); + assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded()); + assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded()); + + // Set subcluster 0 as slow + MiniDFSCluster dfsCluster = cluster.getCluster(); + NameNode nn0 = dfsCluster.getNameNode(0); + simulateSlowNamenode(nn0, 1); + + // The subcluster should be overloaded now and reject 4-5 requests + testOverloaded(4, 6); + assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded() + + rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4); + + // Client using HA with 2 Routers + // A single Router gets overloaded, but 2 will handle it + Configuration clientConf = cluster.getRouterClientConf(); + + // Each Router should get a similar number of ops (>=8) out of 2*10 + long iniProxyOps0 = rpcMetrics0.getProxyOps(); + long iniProxyOps1 = rpcMetrics1.getProxyOps(); + testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10); + long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0; + long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1; + assertEquals(2 * 10, proxyOps0 + proxyOps1); + assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8); + assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8); + } + + private void testOverloaded(int expOverload) throws Exception { + testOverloaded(expOverload, expOverload); + } + + private void testOverloaded(int expOverloadMin, int expOverloadMax) + throws Exception { + RouterContext routerContext = cluster.getRandomRouter(); + URI address = routerContext.getFileSystemURI(); + Configuration conf = new HdfsConfiguration(); + testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10); + } + + /** + * Test if the Router gets overloaded by submitting requests in parallel. + * We check how many requests got rejected at the end. + * @param expOverloadMin Min number of requests expected as overloaded. + * @param expOverloadMax Max number of requests expected as overloaded. + * @param address Destination address. + * @param conf Configuration of the client. + * @param numOps Number of operations to submit. + * @throws Exception If it cannot perform the test. + */ + private void testOverloaded(int expOverloadMin, int expOverloadMax, + final URI address, final Configuration conf, final int numOps) + throws Exception { + + // Submit renewLease() ops which go to all subclusters + final AtomicInteger overloadException = new AtomicInteger(); + ExecutorService exec = Executors.newFixedThreadPool(numOps); + List> futures = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + // Stagger the operations a little (50ms) + final int sleepTime = i * 50; + Future future = exec.submit(new Runnable() { + @Override + public void run() { + DFSClient routerClient = null; + try { + Thread.sleep(sleepTime); + routerClient = new DFSClient(address, conf); + String clientName = routerClient.getClientName(); + ClientProtocol routerProto = routerClient.getNamenode(); + routerProto.renewLease(clientName); + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(); + assertTrue("Wrong exception: " + ioe, + ioe instanceof StandbyException); + assertExceptionContains("is overloaded", ioe); + overloadException.incrementAndGet(); + } catch (IOException e) { + fail("Unexpected exception: " + e); + } catch (InterruptedException e) { + fail("Cannot sleep: " + e); + } finally { + if (routerClient != null) { + try { + routerClient.close(); + } catch (IOException e) { + LOG.error("Cannot close the client"); + } + } + } + } + }); + futures.add(future); + } + // Wait until all the requests are done + while (!futures.isEmpty()) { + futures.remove(0).get(); + } + exec.shutdown(); + + int num = overloadException.get(); + if (expOverloadMin == expOverloadMax) { + assertEquals(expOverloadMin, num); + } else { + assertTrue("Expected >=" + expOverloadMin + " but was " + num, + num >= expOverloadMin); + assertTrue("Expected <=" + expOverloadMax + " but was " + num, + num <= expOverloadMax); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java index 372dd3beee..e5ab3ab277 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.List; @@ -44,13 +44,8 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; -import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.Whitebox; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.After; @@ -58,10 +53,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Supplier; @@ -70,9 +61,6 @@ */ public class TestRouterRPCClientRetries { - private static final Logger LOG = - LoggerFactory.getLogger(TestRouterRPCClientRetries.class); - private static StateStoreDFSCluster cluster; private static NamenodeContext nnContext1; private static RouterContext routerContext; @@ -144,7 +132,7 @@ public void testRetryWhenAllNameServiceDown() throws Exception { fail("Should have thrown RemoteException error."); } catch (RemoteException e) { String ns0 = cluster.getNameservices().get(0); - GenericTestUtils.assertExceptionContains( + assertExceptionContains( "No namenode available under nameservice " + ns0, e); } @@ -212,14 +200,14 @@ public void testNamenodeMetricsSlow() throws Exception { // Making subcluster0 slow to reply, should only get DNs from nn1 MiniDFSCluster dfsCluster = cluster.getCluster(); NameNode nn0 = dfsCluster.getNameNode(0); - simulateNNSlow(nn0); + simulateSlowNamenode(nn0, 3); waitUpdateLiveNodes(jsonString2, metrics); final String jsonString3 = metrics.getLiveNodes(); assertEquals(2, getNumDatanodes(jsonString3)); // Making subcluster1 slow to reply, shouldn't get any DNs NameNode nn1 = dfsCluster.getNameNode(1); - simulateNNSlow(nn1); + simulateSlowNamenode(nn1, 3); waitUpdateLiveNodes(jsonString3, metrics); final String jsonString4 = metrics.getLiveNodes(); assertEquals(0, getNumDatanodes(jsonString4)); @@ -249,36 +237,11 @@ private static int getNumDatanodes(final String jsonString) private static void waitUpdateLiveNodes( final String oldValue, final NamenodeBeanMetrics metrics) throws Exception { - GenericTestUtils.waitFor(new Supplier() { + waitFor(new Supplier() { @Override public Boolean get() { return !oldValue.equals(metrics.getLiveNodes()); } }, 500, 5 * 1000); } - - /** - * Simulate that a Namenode is slow by adding a sleep to the check operation - * in the NN. - * @param nn Namenode to simulate slow. - * @throws Exception If we cannot add the sleep time. - */ - private static void simulateNNSlow(final NameNode nn) throws Exception { - FSNamesystem namesystem = nn.getNamesystem(); - HAContext haContext = namesystem.getHAContext(); - HAContext spyHAContext = spy(haContext); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - LOG.info("Simulating slow namenode {}", invocation.getMock()); - try { - Thread.sleep(3 * 1000); - } catch(InterruptedException e) { - LOG.error("Simulating a slow namenode aborted"); - } - return null; - } - }).when(spyHAContext).checkOperation(any(OperationCategory.class)); - Whitebox.setInternalState(namesystem, "haContext", spyHAContext); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java index e5d8348da3..f16ceb58f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.Time; import org.junit.After; @@ -187,7 +188,7 @@ public void testRouterRpcSafeMode() try { router.getRpcServer().delete("/testfile.txt", true); fail("We should have thrown a safe mode exception"); - } catch (RouterSafeModeException sme) { + } catch (StandbyException sme) { exception = true; } assertTrue("We should have thrown a safe mode exception", exception);