diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index cae1f47860..de89a152c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -146,4 +146,14 @@ List getNamenodesForNameserviceId( * @param routerId Unique string identifier for the router. */ void setRouterId(String routerId); + + /** + * Rotate cache, make the current namenode have the lowest priority, + * to ensure that the current namenode will not be accessed first next time. + * + * @param nsId name service id + * @param namenode namenode contexts + * @param listObserversFirst Observer read case, observer NN will be ranked first + */ + void rotateCache(String nsId, FederationNamenodeContext namenode, boolean listObserversFirst); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index db1dcdf181..c0e800e043 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -478,4 +478,45 @@ private List getRecentRegistrationForQuery( public void setRouterId(String router) { this.routerId = router; } + + /** + * Rotate cache, make the current namenode have the lowest priority, + * to ensure that the current namenode will not be accessed first next time. + * + * @param nsId name service id + * @param namenode namenode contexts + * @param listObserversFirst Observer read case, observer NN will be ranked first + */ + @Override + public void rotateCache( + String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { + cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts) -> { + if (namenodeContexts == null || namenodeContexts.size() <= 1) { + return namenodeContexts; + } + FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0); + /* + * If the first nn in the cache is active, the active nn priority cannot be lowered. + * This happens when other threads have already updated the cache. + */ + if (firstNamenodeContext.getState().equals(ACTIVE)) { + return namenodeContexts; + } + /* + * If the first nn in the cache at this time is not the nn + * that needs to be lowered in priority, there is no need to rotate. + * This happens when other threads have already rotated the cache. + */ + if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) { + List rotatedNnContexts = new ArrayList<>(namenodeContexts); + Collections.rotate(rotatedNnContexts, -1); + String firstNamenodeId = namenodeContexts.get(0).getNamenodeId(); + LOG.info("Rotate cache of pair , put namenode: {} in the " + + "first position of the cache and namenode: {} in the last position of the cache", + nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId()); + return rotatedNnContexts; + } + return namenodeContexts; + }); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 321d97e5da..b38900c3bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -599,6 +599,8 @@ public Object invokeMethod( } LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); + // Rotate cache so that client can retry the next namenode in the cache + this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); // Throw RetriableException so that client can retry throw new RetriableException(ioe); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 2c70395870..bf22cf0114 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -1202,4 +1202,13 @@ public void waitClusterUp() throws IOException { throw new IOException("Cannot wait for the namenodes", e); } } + + /** + * Get cache flush interval in milliseconds. + * + * @return Cache flush interval in milliseconds. + */ + public long getCacheFlushInterval() { + return cacheFlushInterval; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 4aaa8e7569..554879856a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -397,6 +397,11 @@ public List getMountPoints(String path) throws IOException { public void setRouterId(String router) { } + @Override + public void rotateCache( + String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { + } + /** * Mocks the availability of default namespace. * @param b if true default namespace is unset. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index 8d77654680..176ac4b078 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE; +import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby; @@ -42,16 +44,19 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.GenericTestUtils; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -357,6 +362,82 @@ public void testNoNamenodesAvailable() throws Exception{ assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes()); } + /** + * When failover occurs, the router may record that the ns has no active namenode. + * Only when the router updates the cache next time can the memory status be updated, + * causing the router to report NoNamenodesAvailableException for a long time. + */ + @Test + public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { + setupCluster(false, true); + transitionClusterNSToStandby(cluster); + for (RouterContext routerContext : cluster.getRouters()) { + // Manually trigger the heartbeat + Collection heartbeatServices = routerContext + .getRouter().getNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService service : heartbeatServices) { + service.periodicInvoke(); + } + // Update service cache + routerContext.getRouter().getStateStore().refreshCaches(true); + } + // Record the time after the router first updated the cache + long firstLoadTime = Time.now(); + List namenodes = cluster.getNamenodes(); + + // Make sure all namenodes are in standby state + for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) { + assertEquals(STANDBY.ordinal(), namenodeContext.getNamenode().getNameNodeState()); + } + + Configuration conf = cluster.getRouterClientConf(); + // Set dfs.client.failover.random.order false, to pick 1st router at first + conf.setBoolean("dfs.client.failover.random.order", false); + + DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf); + + for (RouterContext routerContext : cluster.getRouters()) { + // Get the second namenode in the router cache and make it active + List ns0 = routerContext.getRouter() + .getNamenodeResolver() + .getNamenodesForNameserviceId("ns0", false); + + String nsId = ns0.get(1).getNamenodeId(); + cluster.switchToActive("ns0", nsId); + // Manually trigger the heartbeat, but the router does not manually load the cache + Collection heartbeatServices = routerContext + .getRouter().getNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService service : heartbeatServices) { + service.periodicInvoke(); + } + assertEquals(ACTIVE.ordinal(), + cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState()); + } + + // Get router0 metrics + FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0) + .getRouter().getRpcServer().getRPCMetrics(); + // Original failures + long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes(); + + /* + * At this time, the router has recorded 2 standby namenodes in memory, + * and the first accessed namenode is indeed standby, + * then an NoNamenodesAvailableException will be reported for the first access, + * and the next access will be successful. + */ + routerClient.getFileInfo("/"); + long successReadTime = Time.now(); + assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes()); + + /* + * access the active namenode without waiting for the router to update the cache, + * even if there are 2 standby states recorded in the router memory. + */ + assertTrue(successReadTime - firstLoadTime < cluster.getCacheFlushInterval()); + } + + @Test public void testAsyncCallerPoolMetrics() throws Exception { setupCluster(true, false);