diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c417316343..4f56362f39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -203,6 +203,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, /** Router using this RPC server. */ private final Router router; + /** Alignment context storing state IDs for all namespaces this router serves. */ + private final RouterStateIdContext routerStateIdContext; + /** The RPC server that listens to requests from clients. */ private final Server rpcServer; /** The address for this RPC server. */ @@ -321,7 +324,7 @@ public RouterRpcServer(Configuration conf, Router router, // Create security manager this.securityManager = new RouterSecurityManager(this.conf); - RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); + routerStateIdContext = new RouterStateIdContext(conf); this.rpcServer = new RPC.Builder(this.conf) .setProtocol(ClientNamenodeProtocolPB.class) @@ -510,6 +513,15 @@ BalanceProcedureScheduler getFedRenameScheduler() { return this.fedRenameScheduler; } + /** + * Get the routerStateIdContext used by this server. + * @return routerStateIdContext + */ + @VisibleForTesting + protected RouterStateIdContext getRouterStateIdContext() { + return routerStateIdContext; + } + /** * Get the RPC security manager. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 23e72546aa..48515047fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThrows; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import java.io.IOException; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -91,6 +93,7 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); if (confOverrides != null) { conf.addResource(confOverrides); } @@ -545,4 +548,20 @@ public void testClientReceiveResponseState() { Assertions.assertEquals(1, latestFederateState.size()); Assertions.assertEquals(10L, latestFederateState.get("ns0")); } + + @Test + public void testStateIdProgressionInRouter() throws Exception { + Path rootPath = new Path("/"); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + RouterStateIdContext routerStateIdContext = routerContext + .getRouterRpcServer() + .getRouterStateIdContext(); + for (int i = 0; i < 10; i++) { + fileSystem.create(new Path(rootPath, "file" + i)).close(); + } + + // Get object storing state of the namespace in the shared RouterStateIdContext + LongAccumulator namespaceStateId = routerStateIdContext.getNamespaceStateId("ns0"); + assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get()); + } }