diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 2a87146011..6652cb26d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -201,8 +201,7 @@ public FsServerDefaults getServerDefaults() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); RemoteMethod method = new RemoteMethod("getServerDefaults"); - String ns = subclusterResolver.getDefaultNamespace(); - return (FsServerDefaults) rpcClient.invokeSingle(ns, method); + return rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java index bf0db6e725..c6b020977d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -45,14 +44,11 @@ public class RouterNamenodeProtocol implements NamenodeProtocol { private final RouterRpcServer rpcServer; /** RPC clients to connect to the Namenodes. */ private final RouterRpcClient rpcClient; - /** Interface to map global name space to HDFS subcluster name spaces. */ - private final FileSubclusterResolver subclusterResolver; public RouterNamenodeProtocol(RouterRpcServer server) { this.rpcServer = server; this.rpcClient = this.rpcServer.getRPCClient(); - this.subclusterResolver = this.rpcServer.getSubclusterResolver(); } @Override @@ -94,33 +90,27 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, public ExportedBlockKeys getBlockKeys() throws IOException { rpcServer.checkOperation(OperationCategory.READ); - // We return the information from the default name space - String defaultNsId = subclusterResolver.getDefaultNamespace(); RemoteMethod method = new RemoteMethod(NamenodeProtocol.class, "getBlockKeys"); - return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class); + return rpcServer.invokeAtAvailableNs(method, ExportedBlockKeys.class); } @Override public long getTransactionID() throws IOException { rpcServer.checkOperation(OperationCategory.READ); - // We return the information from the default name space - String defaultNsId = subclusterResolver.getDefaultNamespace(); RemoteMethod method = new RemoteMethod(NamenodeProtocol.class, "getTransactionID"); - return rpcClient.invokeSingle(defaultNsId, method, long.class); + return rpcServer.invokeAtAvailableNs(method, long.class); } @Override public long getMostRecentCheckpointTxId() throws IOException { rpcServer.checkOperation(OperationCategory.READ); - // We return the information from the default name space - String defaultNsId = subclusterResolver.getDefaultNamespace(); RemoteMethod method = new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId"); - return rpcClient.invokeSingle(defaultNsId, method, long.class); + return rpcServer.invokeAtAvailableNs(method, long.class); } @Override @@ -133,11 +123,9 @@ public CheckpointSignature rollEditLog() throws IOException { public NamespaceInfo versionRequest() throws IOException { rpcServer.checkOperation(OperationCategory.READ); - // We return the information from the default name space - String defaultNsId = subclusterResolver.getDefaultNamespace(); RemoteMethod method = new RemoteMethod(NamenodeProtocol.class, "versionRequest"); - return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class); + return rpcServer.invokeAtAvailableNs(method, NamespaceInfo.class); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 0d4f94c5a2..be6a9b03c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -479,6 +479,29 @@ static String getMethodName() { return methodName; } + /** + * Invokes the method at default namespace, if default namespace is not + * available then at the first available namespace. + * @param expected return type. + * @param method the remote method. + * @return the response received after invoking method. + * @throws IOException + */ + T invokeAtAvailableNs(RemoteMethod method, Class clazz) + throws IOException { + String nsId = subclusterResolver.getDefaultNamespace(); + if (!nsId.isEmpty()) { + return rpcClient.invokeSingle(nsId, method, clazz); + } + // If default Ns is not present return result from first namespace. + Set nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + throw new IOException("No namespace availaible."); + } + nsId = nss.iterator().next().getNameserviceId(); + return rpcClient.invokeSingle(nsId, method, clazz); + } + @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java index 7145940cca..8a55b9a6fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -36,13 +35,10 @@ public class RouterStoragePolicy { private final RouterRpcServer rpcServer; /** RPC clients to connect to the Namenodes. */ private final RouterRpcClient rpcClient; - /** Interface to map global name space to HDFS subcluster name spaces. */ - private final FileSubclusterResolver subclusterResolver; public RouterStoragePolicy(RouterRpcServer server) { this.rpcServer = server; this.rpcClient = this.rpcServer.getRPCClient(); - this.subclusterResolver = this.rpcServer.getSubclusterResolver(); } public void setStoragePolicy(String src, String policyName) @@ -61,8 +57,7 @@ public BlockStoragePolicy[] getStoragePolicies() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); RemoteMethod method = new RemoteMethod("getStoragePolicies"); - String ns = subclusterResolver.getDefaultNamespace(); - return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method); + return rpcServer.invokeAtAvailableNs(method, BlockStoragePolicy[].class); } public void unsetStoragePolicy(String src) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 9bff00732e..cdeab46938 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -57,6 +57,7 @@ public class MockResolver private Map> locations = new HashMap<>(); private Set namespaces = new HashSet<>(); private String defaultNamespace = null; + private boolean disableDefaultNamespace = false; public MockResolver() { this.cleanRegistrations(); @@ -322,8 +323,19 @@ public List getMountPoints(String path) throws IOException { public void setRouterId(String router) { } + /** + * Mocks the availability of default namespace. + * @param b if true default namespace is unset. + */ + public void setDisableNamespace(boolean b) { + this.disableDefaultNamespace = b; + } + @Override public String getDefaultNamespace() { + if (disableDefaultNamespace) { + return ""; + } return defaultNamespace; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 760d755cb8..2d26e1142e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -828,6 +829,40 @@ public void testProxyGetAndUnsetStoragePolicy() throws Exception { assertEquals(nnPolicy.getId(), policy.getId()); } + @Test + public void testListStoragePolicies() throws IOException, URISyntaxException { + MockResolver resolver = + (MockResolver) router.getRouter().getSubclusterResolver(); + try { + // Check with default namespace specified. + BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies(); + assertArrayEquals(policies, routerProtocol.getStoragePolicies()); + // Check with default namespace unspecified. + resolver.setDisableNamespace(true); + assertArrayEquals(policies, routerProtocol.getStoragePolicies()); + } finally { + resolver.setDisableNamespace(false); + } + } + + @Test + public void testGetServerDefaults() throws IOException, URISyntaxException { + MockResolver resolver = + (MockResolver) router.getRouter().getSubclusterResolver(); + try { + // Check with default namespace specified. + FsServerDefaults defaults = namenode.getClient().getServerDefaults(); + assertEquals(defaults.getBlockSize(), + routerProtocol.getServerDefaults().getBlockSize()); + // Check with default namespace unspecified. + resolver.setDisableNamespace(true); + assertEquals(defaults.getBlockSize(), + routerProtocol.getServerDefaults().getBlockSize()); + } finally { + resolver.setDisableNamespace(false); + } + } + @Test public void testProxyGetPreferedBlockSize() throws Exception { @@ -1012,8 +1047,23 @@ public void testProxyGetFileInfoAcessException() throws IOException { @Test public void testProxyVersionRequest() throws Exception { - NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest(); - NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest(); + MockResolver resolver = + (MockResolver) router.getRouter().getSubclusterResolver(); + try { + // Check with default namespace specified. + NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest(); + NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest(); + compareVersion(rVersion, nnVersion); + // Check with default namespace unspecified. + resolver.setDisableNamespace(true); + rVersion = routerNamenodeProtocol.versionRequest(); + compareVersion(rVersion, nnVersion); + } finally { + resolver.setDisableNamespace(false); + } + } + + private void compareVersion(NamespaceInfo rVersion, NamespaceInfo nnVersion) { assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID()); assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID()); assertEquals(nnVersion.getClusterID(), rVersion.getClusterID()); @@ -1023,8 +1073,24 @@ public void testProxyVersionRequest() throws Exception { @Test public void testProxyGetBlockKeys() throws Exception { - ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys(); - ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys(); + MockResolver resolver = + (MockResolver) router.getRouter().getSubclusterResolver(); + try { + // Check with default namespace specified. + ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys(); + ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys(); + compareBlockKeys(rKeys, nnKeys); + // Check with default namespace unspecified. + resolver.setDisableNamespace(true); + rKeys = routerNamenodeProtocol.getBlockKeys(); + compareBlockKeys(rKeys, nnKeys); + } finally { + resolver.setDisableNamespace(false); + } + } + + private void compareBlockKeys(ExportedBlockKeys rKeys, + ExportedBlockKeys nnKeys) { assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey()); assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval()); assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime()); @@ -1054,17 +1120,38 @@ public void testProxyGetBlocks() throws Exception { @Test public void testProxyGetTransactionID() throws IOException { - long routerTransactionID = routerNamenodeProtocol.getTransactionID(); - long nnTransactionID = nnNamenodeProtocol.getTransactionID(); - assertEquals(nnTransactionID, routerTransactionID); + MockResolver resolver = + (MockResolver) router.getRouter().getSubclusterResolver(); + try { + // Check with default namespace specified. + long routerTransactionID = routerNamenodeProtocol.getTransactionID(); + long nnTransactionID = nnNamenodeProtocol.getTransactionID(); + assertEquals(nnTransactionID, routerTransactionID); + // Check with default namespace unspecified. + resolver.setDisableNamespace(true); + routerTransactionID = routerNamenodeProtocol.getTransactionID(); + assertEquals(nnTransactionID, routerTransactionID); + } finally { + resolver.setDisableNamespace(false); + } } @Test public void testProxyGetMostRecentCheckpointTxId() throws IOException { - long routerCheckPointId = - routerNamenodeProtocol.getMostRecentCheckpointTxId(); - long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId(); - assertEquals(nnCheckPointId, routerCheckPointId); + MockResolver resolver = + (MockResolver) router.getRouter().getSubclusterResolver(); + try { + // Check with default namespace specified. + long routerCheckPointId = + routerNamenodeProtocol.getMostRecentCheckpointTxId(); + long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId(); + assertEquals(nnCheckPointId, routerCheckPointId); + // Check with default namespace unspecified. + resolver.setDisableNamespace(true); + routerCheckPointId = routerNamenodeProtocol.getMostRecentCheckpointTxId(); + } finally { + resolver.setDisableNamespace(false); + } } @Test