diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index aa9577330c..2e55323819 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -349,6 +349,12 @@ public static ClientProtocol createProxyWithAlignmentContext( boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { + if (alignmentContext == null && + conf.getBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT)) { + alignmentContext = new ClientGSIContext(); + } + RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index e3e01fde3a..2b511bfc2e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -78,6 +78,8 @@ public interface HdfsClientConfigKeys { int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; + String DFS_RBF_OBSERVER_READ_ENABLE = "dfs.client.rbf.observer.read.enable"; + boolean DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT = false; int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020; String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.namenode.kerberos.principal"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index bdf4697d2a..2c70395870 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -234,7 +234,12 @@ public FileSystem getFileSystem() throws IOException { return DistributedFileSystem.get(conf); } - public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException { + public FileSystem getFileSystem(Configuration configuration) throws IOException { + configuration.addResource(conf); + return DistributedFileSystem.get(configuration); + } + + public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOException { Configuration observerReadConf = new Configuration(conf); observerReadConf.set(DFS_NAMESERVICES, observerReadConf.get(DFS_NAMESERVICES)+ ",router-service"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 23095186d0..e38b0b2a35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; @@ -122,11 +123,17 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th cluster.waitActiveNamespaces(); routerContext = cluster.getRandomRouter(); - fileSystem = routerContext.getFileSystemWithObserverReadsEnabled(); + } + + private static Configuration getConfToEnableObserverReads() { + Configuration conf = new Configuration(); + conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true); + return conf; } @Test public void testObserverRead() throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); internalTestObserverRead(); } @@ -137,7 +144,6 @@ public void testObserverRead() throws Exception { */ @Test public void testReadWithoutObserverClientConfigurations() throws Exception { - fileSystem.close(); fileSystem = routerContext.getFileSystem(); assertThrows(AssertionError.class, this::internalTestObserverRead); } @@ -173,6 +179,7 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception Configuration confOverrides = new Configuration(false); confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); startUpCluster(2, confOverrides); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); @@ -202,6 +209,7 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception Configuration confOverrides = new Configuration(false); confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); startUpCluster(2, confOverrides); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); Path path = new Path("/testFile"); fileSystem.create(path).close(); @@ -219,6 +227,7 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception @Test public void testReadWhenObserverIsDown() throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); @@ -246,6 +255,7 @@ public void testReadWhenObserverIsDown() throws Exception { @Test public void testMultipleObserver() throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); @@ -384,6 +394,7 @@ public void testMultipleObserverRouter() throws Exception { @Test public void testUnavailableObserverNN() throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); stopObserver(2); Path path = new Path("/testFile"); @@ -417,10 +428,9 @@ public void testUnavailableObserverNN() throws Exception { assertTrue("There must be unavailable namenodes", hasUnavailable); } - - @Test public void testRouterMsync() throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); Path path = new Path("/testFile"); // Send Create call to active @@ -439,4 +449,60 @@ public void testRouterMsync() throws Exception { assertEquals("Four calls should be sent to active", 4, rpcCountForActive); } + + @Test + public void testSingleRead() throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read request + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // getListingCall sent to active. + assertEquals("Only one call should be sent to active", 1, rpcCountForActive); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getList call should be sent to observer + assertEquals("No calls should be sent to observer", 0, rpcCountForObserver); + } + + @Test + public void testSingleReadUsingObserverReadProxyProvider() throws Exception { + fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider(); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read request + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Two msync calls to the active namenodes. + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getList call should be sent to observer + assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e4cb5b9ffe..2a0a4945fa 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6442,4 +6442,11 @@ If the namespace is DEFAULT, it's best to change this conf to other value. + + dfs.client.rbf.observer.read.enable + false + + Enables observer reads for clients. This should only be enabled when clients are using routers. + +