diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java index 1f2b12d445..2f7195f36b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java @@ -64,7 +64,12 @@ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder h */ @Override public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) { - sharedGlobalStateId.accumulate(header.getStateId()); + if (header.getStateId() == 0 && sharedGlobalStateId.get() > 0) { + sharedGlobalStateId.reset(); + poolLocalStateId.reset(); + } else { + sharedGlobalStateId.accumulate(header.getStateId()); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 2f8beb20f7..eaee5b8b14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -107,10 +107,7 @@ public void teardown() throws IOException { public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { int numberOfNamenode = 2 + numberOfObserver; Configuration conf = new Configuration(false); - conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); - conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); - conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); - conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); + setConfDefaults(conf); if (confOverrides != null) { confOverrides .iterator() @@ -153,6 +150,13 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th routerContext = cluster.getRandomRouter(); } + private void setConfDefaults(Configuration conf) { + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); + } + public enum ConfigSetting { USE_NAMENODE_PROXY_FLAG, USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER, @@ -972,4 +976,55 @@ public void testMsyncWithNoNamespacesEligibleForCRS(ConfigSetting configSetting) // There should no calls to any namespace. assertEquals("No calls to any namespace", 0, rpcCountForActive); } + + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testRestartingNamenodeWithStateIDContextDisabled(ConfigSetting configSetting) + throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + Path path = new Path("/testFile1"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request + fileSystem.open(path).close(); + + long observerCount1 = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + // Restart active namenodes and disable sending state id. + restartActiveWithStateIDContextDisabled(); + + Configuration conf = getConfToEnableObserverReads(configSetting); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + FileSystem fileSystem2 = routerContext.getFileSystem(conf); + fileSystem2.msync(); + fileSystem2.open(path).close(); + + long observerCount2 = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("There should no extra calls to the observer", observerCount1, observerCount2); + + fileSystem.open(path).close(); + long observerCount3 = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertTrue("Old filesystem will send calls to observer", observerCount3 > observerCount2); + } + + void restartActiveWithStateIDContextDisabled() throws Exception { + for (int nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) { + NameNode nameNode = cluster.getCluster().getNameNode(nnIndex); + if (nameNode != null && nameNode.isActiveState()) { + Configuration conf = new Configuration(); + setConfDefaults(conf); + cluster.getCluster().getConfiguration(nnIndex) + .setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, false); + cluster.getCluster().restartNameNode(nnIndex, true); + cluster.getCluster().getNameNode(nnIndex).isActiveState(); + } + } + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestPoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestPoolAlignmentContext.java index ef6745654c..f691f61728 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestPoolAlignmentContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestPoolAlignmentContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -50,4 +51,35 @@ private void assertRequestHeaderStateId(PoolAlignmentContext poolAlignmentContex poolAlignmentContext.updateRequestState(builder); Assertions.assertEquals(expectedValue, builder.getStateId()); } + + @Test + public void testWhenNamenodeStopsSendingStateId() { + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration()); + String namespaceId = "namespace1"; + PoolAlignmentContext poolContext = new PoolAlignmentContext(routerStateIdContext, namespaceId); + + poolContext.receiveResponseState(getRpcResponseHeader(10L)); + // Last seen value is the one from namenode, + // but request header is the max seen by clients so far. + Assertions.assertEquals(10L, poolContext.getLastSeenStateId()); + assertRequestHeaderStateId(poolContext, Long.MIN_VALUE); + + poolContext.advanceClientStateId(10L); + assertRequestHeaderStateId(poolContext, 10L); + + // When namenode state context is disabled, it returns a stateId of zero + poolContext.receiveResponseState(getRpcResponseHeader(0)); + // Routers should reset the cached state Id to not send a stale value to the observer. + Assertions.assertEquals(Long.MIN_VALUE, poolContext.getLastSeenStateId()); + assertRequestHeaderStateId(poolContext, Long.MIN_VALUE); + } + + private RpcResponseHeaderProto getRpcResponseHeader(long stateID) { + return RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .setStateId(stateID) + .build(); + } }