HDFS-16821: Fixes regression in HDFS-13522 that enables observer reads by default (#5078)

This commit is contained in:
Simbarashe Dzinamarira 2022-11-07 08:43:04 -08:00 committed by GitHub
parent 660530205e
commit 44b8bb7224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 49 deletions

View File

@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth, boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext) AlignmentContext alignmentContext)
throws IOException { throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class); ProtobufRpcEngine2.class);

View File

@ -91,6 +91,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf); return DistributedFileSystem.get(conf);
} }
public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");
return DistributedFileSystem.get(observerReadConf);
}
public DFSClient getClient(UserGroupInformation user) public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
@ -41,15 +42,40 @@
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After; import org.junit.jupiter.api.Test;
import org.junit.Test; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
public class TestObserverWithRouter { public class TestObserverWithRouter {
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
private MiniRouterDFSCluster cluster; private MiniRouterDFSCluster cluster;
private RouterContext routerContext;
private FileSystem fileSystem;
public void startUpCluster(int numberOfObserver) throws Exception { @BeforeEach
startUpCluster(numberOfObserver, null); void init(TestInfo info) throws Exception {
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
return;
}
startUpCluster(2, null);
}
@AfterEach
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
routerContext = null;
if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
} }
public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
cluster.installMockLocations(); cluster.installMockLocations();
cluster.waitActiveNamespaces(); cluster.waitActiveNamespaces();
} routerContext = cluster.getRandomRouter();
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled();
@After
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
} }
@Test @Test
public void testObserverRead() throws Exception { public void testObserverRead() throws Exception {
startUpCluster(1); internalTestObserverRead();
RouterContext routerContext = cluster.getRandomRouter(); }
/**
* Tests that without adding config to use ObserverProxyProvider, the client shouldn't
* have reads served by Observers.
* Fixes regression in HDFS-13522.
*/
@Test
public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem.close();
fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead);
}
public void internalTestObserverRead()
throws Exception {
List<? extends FederationNamenodeContext> namenodes = routerContext List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver() .getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(), assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER); FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile"); Path path = new Path("/testFile");
// Send Create call to active // Send create call
fileSystem.create(path).close(); fileSystem.create(path).close();
// Send read request to observer // Send read request
fileSystem.open(path).close(); fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer() long rpcCountForActive = routerContext.getRouter().getRpcServer()
@ -131,21 +165,19 @@ public void testObserverRead() throws Exception {
.getRPCMetrics().getObserverProxyOps(); .getRPCMetrics().getObserverProxyOps();
// getBlockLocations should be sent to observer // getBlockLocations should be sent to observer
assertEquals("One call should be sent to observer", 1, rpcCountForObserver); assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
fileSystem.close();
} }
@Test @Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testObserverReadWithoutFederatedStatePropagation() throws Exception { public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
Configuration confOverrides = new Configuration(false); Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(1, confOverrides); startUpCluster(2, confOverrides);
RouterContext routerContext = cluster.getRandomRouter();
List<? extends FederationNamenodeContext> namenodes = routerContext List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver() .getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(), assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER); FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile"); Path path = new Path("/testFile");
// Send Create call to active // Send Create call to active
fileSystem.create(path).close(); fileSystem.create(path).close();
@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
long rpcCountForObserver = routerContext.getRouter().getRpcServer() long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps(); .getRPCMetrics().getObserverProxyOps();
assertEquals("No call should be sent to observer", 0, rpcCountForObserver); assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
fileSystem.close();
} }
@Test @Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testDisablingObserverReadUsingNameserviceOverride() throws Exception { public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
// Disable observer reads using per-nameservice override // Disable observer reads using per-nameservice override
Configuration confOverrides = new Configuration(false); Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(1, confOverrides); startUpCluster(2, confOverrides);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile"); Path path = new Path("/testFile");
fileSystem.create(path).close(); fileSystem.create(path).close();
fileSystem.open(path).close(); fileSystem.open(path).close();
fileSystem.close();
long rpcCountForActive = routerContext.getRouter().getRpcServer() long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps(); .getRPCMetrics().getActiveProxyOps();
@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception
@Test @Test
public void testReadWhenObserverIsDown() throws Exception { public void testReadWhenObserverIsDown() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1"); Path path = new Path("/testFile1");
// Send Create call to active // Send Create call to active
fileSystem.create(path).close(); fileSystem.create(path).close();
// Stop observer NN // Stop observer NN
int nnIndex = stopObserver(1); int nnIndex = stopObserver(1);
assertNotEquals("No observer found", 3, nnIndex); assertNotEquals("No observer found", 3, nnIndex);
nnIndex = stopObserver(1);
assertNotEquals("No observer found", 4, nnIndex);
// Send read request // Send read request
fileSystem.open(path).close(); fileSystem.open(path).close();
@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception {
.getRPCMetrics().getObserverProxyOps(); .getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer", 0, assertEquals("No call should send to observer", 0,
rpcCountForObserver); rpcCountForObserver);
fileSystem.close();
} }
@Test @Test
public void testMultipleObserver() throws Exception { public void testMultipleObserver() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1"); Path path = new Path("/testFile1");
// Send Create call to active // Send Create call to active
fileSystem.create(path).close(); fileSystem.create(path).close();
@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception {
.getRpcServer().getRPCMetrics().getObserverProxyOps(); .getRpcServer().getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer", assertEquals("No call should send to observer",
expectedObserverRpc, rpcCountForObserver); expectedObserverRpc, rpcCountForObserver);
fileSystem.close();
} }
private int stopObserver(int num) { private int stopObserver(int num) {
@ -288,9 +310,9 @@ private int stopObserver(int num) {
// test router observer with multiple to know which observer NN received // test router observer with multiple to know which observer NN received
// requests // requests
@Test @Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMultipleObserverRouter() throws Exception { public void testMultipleObserverRouter() throws Exception {
StateStoreDFSCluster innerCluster; StateStoreDFSCluster innerCluster;
RouterContext routerContext;
MembershipNamenodeResolver resolver; MembershipNamenodeResolver resolver;
String ns0; String ns0;
@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception {
namespaceInfo0.get(1).getNamenodeId()); namespaceInfo0.get(1).getNamenodeId());
assertEquals(namespaceInfo1.get(0).getState(), assertEquals(namespaceInfo1.get(0).getState(),
FederationNamenodeServiceState.OBSERVER); FederationNamenodeServiceState.OBSERVER);
innerCluster.shutdown();
} }
@Test @Test
public void testUnavailableObserverNN() throws Exception { public void testUnavailableObserverNN() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
stopObserver(2); stopObserver(2);
Path path = new Path("/testFile"); Path path = new Path("/testFile");
@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception {
assertTrue("There must be unavailable namenodes", hasUnavailable); assertTrue("There must be unavailable namenodes", hasUnavailable);
} }
@Test @Test
public void testRouterMsync() throws Exception { public void testRouterMsync() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile"); Path path = new Path("/testFile");
// Send Create call to active // Send Create call to active
@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception {
// 2 msync calls should be sent. One to each active namenode in the two namespaces. // 2 msync calls should be sent. One to each active namenode in the two namespaces.
assertEquals("Four calls should be sent to active", 4, assertEquals("Four calls should be sent to active", 4,
rpcCountForActive); rpcCountForActive);
fileSystem.close();
} }
} }