HDFS-13750. RBF: Router ID in RouterRpcClient is always null. Contributed by Takanobu Asanuma.
This commit is contained in:
parent
e3d73bbc24
commit
01ff817814
@ -92,8 +92,8 @@ public class RouterRpcClient {
|
||||
LoggerFactory.getLogger(RouterRpcClient.class);
|
||||
|
||||
|
||||
/** Router identifier. */
|
||||
private final String routerId;
|
||||
/** Router using this RPC client. */
|
||||
private final Router router;
|
||||
|
||||
/** Interface to identify the active NN for a nameservice or blockpool ID. */
|
||||
private final ActiveNamenodeResolver namenodeResolver;
|
||||
@ -116,12 +116,13 @@ public class RouterRpcClient {
|
||||
* Create a router RPC client to manage remote procedure calls to NNs.
|
||||
*
|
||||
* @param conf Hdfs Configuation.
|
||||
* @param router A router using this RPC client.
|
||||
* @param resolver A NN resolver to determine the currently active NN in HA.
|
||||
* @param monitor Optional performance monitor.
|
||||
*/
|
||||
public RouterRpcClient(Configuration conf, String identifier,
|
||||
public RouterRpcClient(Configuration conf, Router router,
|
||||
ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
|
||||
this.routerId = identifier;
|
||||
this.router = router;
|
||||
|
||||
this.namenodeResolver = resolver;
|
||||
|
||||
@ -343,7 +344,8 @@ private Object invokeMethod(
|
||||
|
||||
if (namenodes == null || namenodes.isEmpty()) {
|
||||
throw new IOException("No namenodes to invoke " + method.getName() +
|
||||
" with params " + Arrays.toString(params) + " from " + this.routerId);
|
||||
" with params " + Arrays.toString(params) + " from "
|
||||
+ router.getRouterId());
|
||||
}
|
||||
|
||||
Object ret = null;
|
||||
@ -1126,7 +1128,7 @@ public Object call() throws Exception {
|
||||
String msg = "Not enough client threads " + active + "/" + total;
|
||||
LOG.error(msg);
|
||||
throw new StandbyException(
|
||||
"Router " + routerId + " is overloaded: " + msg);
|
||||
"Router " + router.getRouterId() + " is overloaded: " + msg);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
|
||||
throw new IOException(
|
||||
@ -1150,7 +1152,7 @@ private List<? extends FederationNamenodeContext> getNamenodesForNameservice(
|
||||
|
||||
if (namenodes == null || namenodes.isEmpty()) {
|
||||
throw new IOException("Cannot locate a registered namenode for " + nsId +
|
||||
" from " + this.routerId);
|
||||
" from " + router.getRouterId());
|
||||
}
|
||||
return namenodes;
|
||||
}
|
||||
@ -1171,7 +1173,7 @@ private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
|
||||
|
||||
if (namenodes == null || namenodes.isEmpty()) {
|
||||
throw new IOException("Cannot locate a registered namenode for " + bpId +
|
||||
" from " + this.routerId);
|
||||
" from " + router.getRouterId());
|
||||
}
|
||||
return namenodes;
|
||||
}
|
||||
|
@ -196,6 +196,7 @@ public class RouterRpcServer extends AbstractService
|
||||
* Construct a router RPC server.
|
||||
*
|
||||
* @param configuration HDFS Configuration.
|
||||
* @param router A router using this RPC server.
|
||||
* @param nnResolver The NN resolver instance to determine active NNs in HA.
|
||||
* @param fileResolver File resolver to resolve file paths to subclusters.
|
||||
* @throws IOException If the RPC server could not be created.
|
||||
@ -291,7 +292,7 @@ public RouterRpcServer(Configuration configuration, Router router,
|
||||
this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf);
|
||||
|
||||
// Create the client
|
||||
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
|
||||
this.rpcClient = new RouterRpcClient(this.conf, this.router,
|
||||
this.namenodeResolver, this.rpcMonitor);
|
||||
|
||||
// Initialize modules
|
||||
|
@ -129,6 +129,9 @@ public void updateActiveNamenode(
|
||||
// Return a copy of the list because it is updated periodically
|
||||
List<? extends FederationNamenodeContext> namenodes =
|
||||
this.resolver.get(nameserviceId);
|
||||
if (namenodes == null) {
|
||||
namenodes = new ArrayList<>();
|
||||
}
|
||||
return Collections.unmodifiableList(new ArrayList<>(namenodes));
|
||||
}
|
||||
|
||||
|
@ -18,9 +18,11 @@
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -185,4 +187,20 @@ public void testRouterRpcWithNoSubclusters() throws IOException {
|
||||
router.stop();
|
||||
router.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouterIDInRouterRpcClient() throws Exception {
|
||||
|
||||
Router router = new Router();
|
||||
router.init(new RouterConfigBuilder(conf).rpc().build());
|
||||
router.setRouterId("Router-0");
|
||||
RemoteMethod remoteMethod = mock(RemoteMethod.class);
|
||||
|
||||
intercept(IOException.class, "Router-0",
|
||||
() -> router.getRpcServer().getRPCClient()
|
||||
.invokeSingle("ns0", remoteMethod));
|
||||
|
||||
router.stop();
|
||||
router.close();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user