diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 4acec82824..aa9577330c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext( boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { - if (alignmentContext == null) { - alignmentContext = new ClientGSIContext(); - } RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 4fcdf6595e..bdf4697d2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; @@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException { return DistributedFileSystem.get(conf); } + public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException { + Configuration observerReadConf = new Configuration(conf); + observerReadConf.set(DFS_NAMESERVICES, + observerReadConf.get(DFS_NAMESERVICES)+ ",router-service"); + observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1"); + observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1", + getFileSystemURI().toString()); + observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + "router-service", ObserverReadProxyProvider.class.getName()); + DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service"); + + return DistributedFileSystem.get(observerReadConf); + } + public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index fbd731c073..23095186d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; @@ -41,15 +42,40 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.junit.After; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; + public class TestObserverWithRouter { - + private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup"; private MiniRouterDFSCluster cluster; + private RouterContext routerContext; + private FileSystem fileSystem; - public void startUpCluster(int numberOfObserver) throws Exception { - startUpCluster(numberOfObserver, null); + @BeforeEach + void init(TestInfo info) throws Exception { + if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) { + return; + } + startUpCluster(2, null); + } + + @AfterEach + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + + routerContext = null; + + if (fileSystem != null) { + fileSystem.close(); + fileSystem = null; + } } public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { @@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th cluster.installMockLocations(); cluster.waitActiveNamespaces(); - } - - @After - public void teardown() throws IOException { - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } + routerContext = cluster.getRandomRouter(); + fileSystem = routerContext.getFileSystemWithObserverReadsEnabled(); } @Test public void testObserverRead() throws Exception { - startUpCluster(1); - RouterContext routerContext = cluster.getRandomRouter(); + internalTestObserverRead(); + } + + /** + * Tests that without adding config to use ObserverProxyProvider, the client shouldn't + * have reads served by Observers. + * Fixes regression in HDFS-13522. + */ + @Test + public void testReadWithoutObserverClientConfigurations() throws Exception { + fileSystem.close(); + fileSystem = routerContext.getFileSystem(); + assertThrows(AssertionError.class, this::internalTestObserverRead); + } + + public void internalTestObserverRead() + throws Exception { List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); assertEquals("First namenode should be observer", namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); - // Send Create call to active + // Send create call fileSystem.create(path).close(); - // Send read request to observer + // Send read request fileSystem.open(path).close(); long rpcCountForActive = routerContext.getRouter().getRpcServer() @@ -131,21 +165,19 @@ public void testObserverRead() throws Exception { .getRPCMetrics().getObserverProxyOps(); // getBlockLocations should be sent to observer assertEquals("One call should be sent to observer", 1, rpcCountForObserver); - fileSystem.close(); } @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testObserverReadWithoutFederatedStatePropagation() throws Exception { Configuration confOverrides = new Configuration(false); confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); - startUpCluster(1, confOverrides); - RouterContext routerContext = cluster.getRandomRouter(); + startUpCluster(2, confOverrides); List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); assertEquals("First namenode should be observer", namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); // Send Create call to active fileSystem.create(path).close(); @@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); assertEquals("No call should be sent to observer", 0, rpcCountForObserver); - fileSystem.close(); } @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testDisablingObserverReadUsingNameserviceOverride() throws Exception { // Disable observer reads using per-nameservice override Configuration confOverrides = new Configuration(false); confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); - startUpCluster(1, confOverrides); + startUpCluster(2, confOverrides); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); fileSystem.create(path).close(); fileSystem.open(path).close(); - fileSystem.close(); long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); @@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception @Test public void testReadWhenObserverIsDown() throws Exception { - startUpCluster(1); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); // Stop observer NN int nnIndex = stopObserver(1); - assertNotEquals("No observer found", 3, nnIndex); + nnIndex = stopObserver(1); + assertNotEquals("No observer found", 4, nnIndex); // Send read request fileSystem.open(path).close(); @@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception { .getRPCMetrics().getObserverProxyOps(); assertEquals("No call should send to observer", 0, rpcCountForObserver); - fileSystem.close(); } @Test public void testMultipleObserver() throws Exception { - startUpCluster(2); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); @@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception { .getRpcServer().getRPCMetrics().getObserverProxyOps(); assertEquals("No call should send to observer", expectedObserverRpc, rpcCountForObserver); - fileSystem.close(); } private int stopObserver(int num) { @@ -288,9 +310,9 @@ private int stopObserver(int num) { // test router observer with multiple to know which observer NN received // requests @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testMultipleObserverRouter() throws Exception { StateStoreDFSCluster innerCluster; - RouterContext routerContext; MembershipNamenodeResolver resolver; String ns0; @@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception { namespaceInfo0.get(1).getNamenodeId()); assertEquals(namespaceInfo1.get(0).getState(), FederationNamenodeServiceState.OBSERVER); + + innerCluster.shutdown(); } @Test public void testUnavailableObserverNN() throws Exception { - startUpCluster(2); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); - stopObserver(2); Path path = new Path("/testFile"); @@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception { assertTrue("There must be unavailable namenodes", hasUnavailable); } + + @Test public void testRouterMsync() throws Exception { - startUpCluster(1); - RouterContext routerContext = cluster.getRandomRouter(); - - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); // Send Create call to active @@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception { // 2 msync calls should be sent. One to each active namenode in the two namespaces. assertEquals("Four calls should be sent to active", 4, rpcCountForActive); - fileSystem.close(); } } \ No newline at end of file