diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 48d4ff0d6c..2b6c4a1f2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -43,7 +43,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -671,8 +670,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, /** * Invokes the method at default namespace, if default namespace is not - * available then at the first available namespace. - * If the namespace is unavailable, retry once with other namespace. + * available then at the other available namespaces. + * If the namespace is unavailable, retry with other namespaces. * @param expected return type. * @param method the remote method. * @return the response received after invoking method. @@ -681,28 +680,29 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, T invokeAtAvailableNs(RemoteMethod method, Class clazz) throws IOException { String nsId = subclusterResolver.getDefaultNamespace(); - // If default Ns is not present return result from first namespace. Set nss = namenodeResolver.getNamespaces(); - try { - if (!nsId.isEmpty()) { + // If no namespace is available, then throw this IOException. + IOException io = new IOException("No namespace available."); + // If default Ns is present return result from that namespace. + if (!nsId.isEmpty()) { + try { return rpcClient.invokeSingle(nsId, method, clazz); + } catch (IOException ioe) { + if (!clientProto.isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + // Remove the already tried namespace. + nss.removeIf(n -> n.getNameserviceId().equals(nsId)); + return invokeOnNs(method, clazz, io, nss); } - // If no namespace is available, throw IOException. - IOException io = new IOException("No namespace available."); - return invokeOnNs(method, clazz, io, nss); - } catch (IOException ioe) { - if (!clientProto.isUnavailableSubclusterException(ioe)) { - LOG.debug("{} exception cannot be retried", - ioe.getClass().getSimpleName()); - throw ioe; - } - Set nssWithoutFailed = getNameSpaceInfo(nss, nsId); - return invokeOnNs(method, clazz, ioe, nssWithoutFailed); } + return invokeOnNs(method, clazz, io, nss); } /** - * Invoke the method on first available namespace, + * Invoke the method sequentially on available namespaces, * throw no namespace available exception, if no namespaces are available. * @param method the remote method. * @param clazz Class for the return type. @@ -716,26 +716,22 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, if (nss.isEmpty()) { throw ioe; } - String nsId = nss.iterator().next().getNameserviceId(); - return rpcClient.invokeSingle(nsId, method, clazz); - } - - /** - * Get set of namespace info's removing the already invoked namespaceinfo. - * @param nss List of namespaces in the federation. - * @param nsId Already invoked namespace id. - * @return List of name spaces in the federation on - * removing the already invoked namespaceinfo. - */ - private static Set getNameSpaceInfo( - final Set nss, final String nsId) { - Set namespaceInfos = new HashSet<>(); - for (FederationNamespaceInfo ns : nss) { - if (!nsId.equals(ns.getNameserviceId())) { - namespaceInfos.add(ns); + for (FederationNamespaceInfo fnInfo : nss) { + String nsId = fnInfo.getNameserviceId(); + LOG.debug("Invoking {} on namespace {}", method, nsId); + try { + return rpcClient.invokeSingle(nsId, method, clazz); + } catch (IOException e) { + LOG.debug("Failed to invoke {} on namespace {}", method, nsId, e); + // Ignore the exception and try on other namespace, if the tried + // namespace is unavailable, else throw the received exception. + if (!clientProto.isUnavailableSubclusterException(e)) { + throw e; + } } } - return namespaceInfos; + // Couldn't get a response from any of the namespace, throw ioe. + throw ioe; } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java index ee92ec4f77..aa29e8d15e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java @@ -76,13 +76,14 @@ import org.junit.Test; * Tests router rpc with multiple destination mount table resolver. */ public class TestRouterRPCMultipleDestinationMountTableResolver { - private static final List NS_IDS = Arrays.asList("ns0", "ns1"); + private static final List NS_IDS = Arrays.asList("ns0", "ns1", "ns2"); private static StateStoreDFSCluster cluster; private static RouterContext routerContext; private static MountTableResolver resolver; private static DistributedFileSystem nnFs0; private static DistributedFileSystem nnFs1; + private static DistributedFileSystem nnFs2; private static DistributedFileSystem routerFs; private static RouterRpcServer rpcServer; @@ -90,7 +91,7 @@ public class TestRouterRPCMultipleDestinationMountTableResolver { public static void setUp() throws Exception { // Build and start a federated cluster - cluster = new StateStoreDFSCluster(false, 2, + cluster = new StateStoreDFSCluster(false, 3, MultipleDestinationMountTableResolver.class); Configuration routerConf = new RouterConfigBuilder().stateStore().admin().quota().rpc().build(); @@ -111,6 +112,8 @@ public class TestRouterRPCMultipleDestinationMountTableResolver { .getNamenode(cluster.getNameservices().get(0), null).getFileSystem(); nnFs1 = (DistributedFileSystem) cluster .getNamenode(cluster.getNameservices().get(1), null).getFileSystem(); + nnFs2 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(2), null).getFileSystem(); routerFs = (DistributedFileSystem) routerContext.getFileSystem(); rpcServer =routerContext.getRouter().getRpcServer(); } @@ -668,6 +671,7 @@ public class TestRouterRPCMultipleDestinationMountTableResolver { // Make one subcluster unavailable. MiniDFSCluster dfsCluster = cluster.getCluster(); dfsCluster.shutdownNameNode(0); + dfsCluster.shutdownNameNode(1); try { // Verify that #invokeAtAvailableNs works by calling #getServerDefaults. RemoteMethod method = new RemoteMethod("getServerDefaults"); @@ -675,7 +679,8 @@ public class TestRouterRPCMultipleDestinationMountTableResolver { rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class); assertNotNull(serverDefaults); } finally { - dfsCluster.restartNameNode(0); + dfsCluster.restartNameNode(0, false); + dfsCluster.restartNameNode(1); } } @@ -893,6 +898,9 @@ public class TestRouterRPCMultipleDestinationMountTableResolver { if (nsId.equals("ns1")) { return nnFs1; } + if (nsId.equals("ns2")) { + return nnFs2; + } return null; }