diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 3eb7241428..56ca55ff6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -92,8 +92,8 @@ public class RouterRpcClient { LoggerFactory.getLogger(RouterRpcClient.class); - /** Router identifier. */ - private final String routerId; + /** Router using this RPC client. */ + private final Router router; /** Interface to identify the active NN for a nameservice or blockpool ID. */ private final ActiveNamenodeResolver namenodeResolver; @@ -116,12 +116,13 @@ public class RouterRpcClient { * Create a router RPC client to manage remote procedure calls to NNs. * * @param conf Hdfs Configuation. + * @param router A router using this RPC client. * @param resolver A NN resolver to determine the currently active NN in HA. * @param monitor Optional performance monitor. */ - public RouterRpcClient(Configuration conf, String identifier, + public RouterRpcClient(Configuration conf, Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { - this.routerId = identifier; + this.router = router; this.namenodeResolver = resolver; @@ -343,7 +344,8 @@ private Object invokeMethod( if (namenodes == null || namenodes.isEmpty()) { throw new IOException("No namenodes to invoke " + method.getName() + - " with params " + Arrays.toString(params) + " from " + this.routerId); + " with params " + Arrays.toString(params) + " from " + + router.getRouterId()); } Object ret = null; @@ -1126,7 +1128,7 @@ public Object call() throws Exception { String msg = "Not enough client threads " + active + "/" + total; LOG.error(msg); throw new StandbyException( - "Router " + routerId + " is overloaded: " + msg); + "Router " + router.getRouterId() + " is overloaded: " + msg); } catch (InterruptedException ex) { LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); throw new IOException( @@ -1150,7 +1152,7 @@ private List getNamenodesForNameservice( if (namenodes == null || namenodes.isEmpty()) { throw new IOException("Cannot locate a registered namenode for " + nsId + - " from " + this.routerId); + " from " + router.getRouterId()); } return namenodes; } @@ -1171,7 +1173,7 @@ private List getNamenodesForBlockPoolId( if (namenodes == null || namenodes.isEmpty()) { throw new IOException("Cannot locate a registered namenode for " + bpId + - " from " + this.routerId); + " from " + router.getRouterId()); } return namenodes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index fe5499366f..2deda9ffff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -196,6 +196,7 @@ public class RouterRpcServer extends AbstractService * Construct a router RPC server. * * @param configuration HDFS Configuration. + * @param router A router using this RPC server. * @param nnResolver The NN resolver instance to determine active NNs in HA. * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. @@ -291,7 +292,7 @@ public RouterRpcServer(Configuration configuration, Router router, this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); // Create the client - this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), + this.rpcClient = new RouterRpcClient(this.conf, this.router, this.namenodeResolver, this.rpcMonitor); // Initialize modules diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 36cce391ae..f5636ceccd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -129,6 +129,9 @@ public void updateActiveNamenode( // Return a copy of the list because it is updated periodically List namenodes = this.resolver.get(nameserviceId); + if (namenodes == null) { + namenodes = new ArrayList<>(); + } return Collections.unmodifiableList(new ArrayList<>(namenodes)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index f8cf009479..db4be292fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.IOException; import java.net.InetSocketAddress; @@ -185,4 +187,20 @@ public void testRouterRpcWithNoSubclusters() throws IOException { router.stop(); router.close(); } + + @Test + public void testRouterIDInRouterRpcClient() throws Exception { + + Router router = new Router(); + router.init(new RouterConfigBuilder(conf).rpc().build()); + router.setRouterId("Router-0"); + RemoteMethod remoteMethod = mock(RemoteMethod.class); + + intercept(IOException.class, "Router-0", + () -> router.getRpcServer().getRPCClient() + .invokeSingle("ns0", remoteMethod)); + + router.stop(); + router.close(); + } }