HDFS-16845: Adds configuration flag to allow clients to use router observer reads without using the ObserverReadProxyProvider. (#5142)

This commit is contained in:
Simbarashe Dzinamarira 2022-11-28 16:49:10 -08:00 committed by GitHub
parent ec2856d79c
commit 909aeca86c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 91 additions and 5 deletions

View File

@ -349,6 +349,12 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth, boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext) AlignmentContext alignmentContext)
throws IOException { throws IOException {
if (alignmentContext == null &&
conf.getBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE,
HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT)) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class); ProtobufRpcEngine2.class);

View File

@ -78,6 +78,8 @@ public interface HdfsClientConfigKeys {
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
String DFS_RBF_OBSERVER_READ_ENABLE = "dfs.client.rbf.observer.read.enable";
boolean DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT = false;
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020; int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
"dfs.namenode.kerberos.principal"; "dfs.namenode.kerberos.principal";

View File

@ -234,7 +234,12 @@ public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf); return DistributedFileSystem.get(conf);
} }
public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException { public FileSystem getFileSystem(Configuration configuration) throws IOException {
configuration.addResource(conf);
return DistributedFileSystem.get(configuration);
}
public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOException {
Configuration observerReadConf = new Configuration(conf); Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES, observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service"); observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@ -122,11 +123,17 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
cluster.waitActiveNamespaces(); cluster.waitActiveNamespaces();
routerContext = cluster.getRandomRouter(); routerContext = cluster.getRandomRouter();
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled(); }
private static Configuration getConfToEnableObserverReads() {
Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
return conf;
} }
@Test @Test
public void testObserverRead() throws Exception { public void testObserverRead() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
internalTestObserverRead(); internalTestObserverRead();
} }
@ -137,7 +144,6 @@ public void testObserverRead() throws Exception {
*/ */
@Test @Test
public void testReadWithoutObserverClientConfigurations() throws Exception { public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem.close();
fileSystem = routerContext.getFileSystem(); fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead); assertThrows(AssertionError.class, this::internalTestObserverRead);
} }
@ -173,6 +179,7 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
Configuration confOverrides = new Configuration(false); Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(2, confOverrides); startUpCluster(2, confOverrides);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
List<? extends FederationNamenodeContext> namenodes = routerContext List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver() .getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
@ -202,6 +209,7 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception
Configuration confOverrides = new Configuration(false); Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(2, confOverrides); startUpCluster(2, confOverrides);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile"); Path path = new Path("/testFile");
fileSystem.create(path).close(); fileSystem.create(path).close();
@ -219,6 +227,7 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception
@Test @Test
public void testReadWhenObserverIsDown() throws Exception { public void testReadWhenObserverIsDown() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile1"); Path path = new Path("/testFile1");
// Send Create call to active // Send Create call to active
fileSystem.create(path).close(); fileSystem.create(path).close();
@ -246,6 +255,7 @@ public void testReadWhenObserverIsDown() throws Exception {
@Test @Test
public void testMultipleObserver() throws Exception { public void testMultipleObserver() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile1"); Path path = new Path("/testFile1");
// Send Create call to active // Send Create call to active
fileSystem.create(path).close(); fileSystem.create(path).close();
@ -384,6 +394,7 @@ public void testMultipleObserverRouter() throws Exception {
@Test @Test
public void testUnavailableObserverNN() throws Exception { public void testUnavailableObserverNN() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
stopObserver(2); stopObserver(2);
Path path = new Path("/testFile"); Path path = new Path("/testFile");
@ -417,10 +428,9 @@ public void testUnavailableObserverNN() throws Exception {
assertTrue("There must be unavailable namenodes", hasUnavailable); assertTrue("There must be unavailable namenodes", hasUnavailable);
} }
@Test @Test
public void testRouterMsync() throws Exception { public void testRouterMsync() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile"); Path path = new Path("/testFile");
// Send Create call to active // Send Create call to active
@ -439,4 +449,60 @@ public void testRouterMsync() throws Exception {
assertEquals("Four calls should be sent to active", 4, assertEquals("Four calls should be sent to active", 4,
rpcCountForActive); rpcCountForActive);
} }
@Test
public void testSingleRead() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
// Send read request
fileSystem.listFiles(path, false);
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// getListingCall sent to active.
assertEquals("Only one call should be sent to active", 1, rpcCountForActive);
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getList call should be sent to observer
assertEquals("No calls should be sent to observer", 0, rpcCountForObserver);
}
@Test
public void testSingleReadUsingObserverReadProxyProvider() throws Exception {
fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider();
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
// Send read request
fileSystem.listFiles(path, false);
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Two msync calls to the active namenodes.
assertEquals("Two calls should be sent to active", 2, rpcCountForActive);
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getList call should be sent to observer
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
}
} }

View File

@ -6442,4 +6442,11 @@
If the namespace is DEFAULT, it's best to change this conf to other value. If the namespace is DEFAULT, it's best to change this conf to other value.
</description> </description>
</property> </property>
<property>
<name>dfs.client.rbf.observer.read.enable</name>
<value>false</value>
<description>
Enables observer reads for clients. This should only be enabled when clients are using routers.
</description>
</property>
</configuration> </configuration>