HDFS-17354:Delay invoke clearStaleNamespacesInRouterStateIdContext during router start up (#6498)
This commit is contained in:
parent
095dfcca30
commit
12498b35bb
@ -430,6 +430,9 @@ public RouterRpcServer(Configuration conf, Router router,
|
|||||||
* Clear expired namespace in the shared RouterStateIdContext.
|
* Clear expired namespace in the shared RouterStateIdContext.
|
||||||
*/
|
*/
|
||||||
private void clearStaleNamespacesInRouterStateIdContext() {
|
private void clearStaleNamespacesInRouterStateIdContext() {
|
||||||
|
if (!router.isRouterState(RouterServiceState.RUNNING)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
final Set<String> resolvedNamespaces = namenodeResolver.getNamespaces()
|
final Set<String> resolvedNamespaces = namenodeResolver.getNamespaces()
|
||||||
.stream()
|
.stream()
|
||||||
|
@ -336,7 +336,14 @@ public synchronized boolean registerNamenode(NamenodeStatusReport report)
|
|||||||
@Override
|
@Override
|
||||||
public synchronized Set<FederationNamespaceInfo> getNamespaces()
|
public synchronized Set<FederationNamespaceInfo> getNamespaces()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return Collections.unmodifiableSet(this.namespaces);
|
Set<FederationNamespaceInfo> ret = new TreeSet<>();
|
||||||
|
Set<String> disabled = getDisabledNamespaces();
|
||||||
|
for (FederationNamespaceInfo ns : namespaces) {
|
||||||
|
if (!disabled.contains(ns.getNameserviceId())) {
|
||||||
|
ret.add(ns);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableSet(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearDisableNamespaces() {
|
public void clearDisableNamespaces() {
|
||||||
|
@ -23,10 +23,12 @@
|
|||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST_STRING;
|
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST_STRING;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS;
|
||||||
import static org.apache.hadoop.ipc.CallerContext.PROXY_USER_PORT;
|
import static org.apache.hadoop.ipc.CallerContext.PROXY_USER_PORT;
|
||||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
@ -72,6 +74,7 @@
|
|||||||
import org.apache.hadoop.fs.SafeModeAction;
|
import org.apache.hadoop.fs.SafeModeAction;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
@ -115,6 +118,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
@ -2327,4 +2331,46 @@ public void testGetListingOrder() throws Exception {
|
|||||||
fileSystem1.delete(new Path(testPath2), true);
|
fileSystem1.delete(new Path(testPath2), true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearStaleNamespacesInRouterStateIdContext() throws Exception {
|
||||||
|
Router testRouter = new Router();
|
||||||
|
Configuration routerConfig = DFSRouter.getConfiguration();
|
||||||
|
routerConfig.set(FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, "2000");
|
||||||
|
routerConfig.set(RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, "false");
|
||||||
|
// Mock resolver classes
|
||||||
|
routerConfig.setClass(RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
|
||||||
|
MockResolver.class, ActiveNamenodeResolver.class);
|
||||||
|
routerConfig.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||||
|
MockResolver.class, FileSubclusterResolver.class);
|
||||||
|
|
||||||
|
testRouter.init(routerConfig);
|
||||||
|
String nsID1 = cluster.getNameservices().get(0);
|
||||||
|
String nsID2 = cluster.getNameservices().get(1);
|
||||||
|
MockResolver resolver = (MockResolver)testRouter.getNamenodeResolver();
|
||||||
|
resolver.registerNamenode(createNamenodeReport(nsID1, "nn1",
|
||||||
|
HAServiceProtocol.HAServiceState.ACTIVE));
|
||||||
|
resolver.registerNamenode(createNamenodeReport(nsID2, "nn1",
|
||||||
|
HAServiceProtocol.HAServiceState.ACTIVE));
|
||||||
|
|
||||||
|
RouterRpcServer rpcServer = testRouter.getRpcServer();
|
||||||
|
|
||||||
|
rpcServer.getRouterStateIdContext().getNamespaceStateId(nsID1);
|
||||||
|
rpcServer.getRouterStateIdContext().getNamespaceStateId(nsID2);
|
||||||
|
|
||||||
|
resolver.disableNamespace(nsID1);
|
||||||
|
Thread.sleep(3000);
|
||||||
|
RouterStateIdContext context = rpcServer.getRouterStateIdContext();
|
||||||
|
assertEquals(2, context.getNamespaceIdMap().size());
|
||||||
|
|
||||||
|
testRouter.start();
|
||||||
|
Thread.sleep(3000);
|
||||||
|
// wait clear stale namespaces
|
||||||
|
RouterStateIdContext routerStateIdContext = rpcServer.getRouterStateIdContext();
|
||||||
|
int size = routerStateIdContext.getNamespaceIdMap().size();
|
||||||
|
assertEquals(1, size);
|
||||||
|
rpcServer.stop();
|
||||||
|
rpcServer.close();
|
||||||
|
testRouter.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user