HDFS-16877: Enables state context for namenode in TestObserverWithRouter (#5257)
This commit is contained in:
parent
b9eb760ed2
commit
cd19da1309
@ -203,6 +203,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
|
|||||||
/** Router using this RPC server. */
|
/** Router using this RPC server. */
|
||||||
private final Router router;
|
private final Router router;
|
||||||
|
|
||||||
|
/** Alignment context storing state IDs for all namespaces this router serves. */
|
||||||
|
private final RouterStateIdContext routerStateIdContext;
|
||||||
|
|
||||||
/** The RPC server that listens to requests from clients. */
|
/** The RPC server that listens to requests from clients. */
|
||||||
private final Server rpcServer;
|
private final Server rpcServer;
|
||||||
/** The address for this RPC server. */
|
/** The address for this RPC server. */
|
||||||
@ -321,7 +324,7 @@ public RouterRpcServer(Configuration conf, Router router,
|
|||||||
|
|
||||||
// Create security manager
|
// Create security manager
|
||||||
this.securityManager = new RouterSecurityManager(this.conf);
|
this.securityManager = new RouterSecurityManager(this.conf);
|
||||||
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
|
routerStateIdContext = new RouterStateIdContext(conf);
|
||||||
|
|
||||||
this.rpcServer = new RPC.Builder(this.conf)
|
this.rpcServer = new RPC.Builder(this.conf)
|
||||||
.setProtocol(ClientNamenodeProtocolPB.class)
|
.setProtocol(ClientNamenodeProtocolPB.class)
|
||||||
@ -510,6 +513,15 @@ BalanceProcedureScheduler getFedRenameScheduler() {
|
|||||||
return this.fedRenameScheduler;
|
return this.fedRenameScheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the routerStateIdContext used by this server.
|
||||||
|
* @return routerStateIdContext
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected RouterStateIdContext getRouterStateIdContext() {
|
||||||
|
return routerStateIdContext;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the RPC security manager.
|
* Get the RPC security manager.
|
||||||
*
|
*
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import static org.junit.Assert.assertThrows;
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -31,6 +32,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.LongAccumulator;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@ -91,6 +93,7 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
|
|||||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||||
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
|
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
|
||||||
|
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
|
||||||
if (confOverrides != null) {
|
if (confOverrides != null) {
|
||||||
conf.addResource(confOverrides);
|
conf.addResource(confOverrides);
|
||||||
}
|
}
|
||||||
@ -545,4 +548,20 @@ public void testClientReceiveResponseState() {
|
|||||||
Assertions.assertEquals(1, latestFederateState.size());
|
Assertions.assertEquals(1, latestFederateState.size());
|
||||||
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateIdProgressionInRouter() throws Exception {
|
||||||
|
Path rootPath = new Path("/");
|
||||||
|
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
|
||||||
|
RouterStateIdContext routerStateIdContext = routerContext
|
||||||
|
.getRouterRpcServer()
|
||||||
|
.getRouterStateIdContext();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
fileSystem.create(new Path(rootPath, "file" + i)).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get object storing state of the namespace in the shared RouterStateIdContext
|
||||||
|
LongAccumulator namespaceStateId = routerStateIdContext.getNamespaceStateId("ns0");
|
||||||
|
assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user