diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadConfiguredFailoverProxyProvider.java new file mode 100644 index 0000000000..56a913520e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadConfiguredFailoverProxyProvider.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; + +/** + * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation + * to support automatic msync-ing when using routers. + *

+ * This constructs a wrapper proxy of ConfiguredFailoverProxyProvider, + * and allows to configure logical names for nameservices. + */ +public class RouterObserverReadConfiguredFailoverProxyProvider + extends RouterObserverReadProxyProvider { + + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(RouterObserverReadConfiguredFailoverProxyProvider.class); + + public RouterObserverReadConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory factory) { + super(conf, uri, xface, factory, + new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java index e494e52429..9707a2a91c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java @@ -47,7 +47,7 @@ */ public class RouterObserverReadProxyProvider extends AbstractNNFailoverProxyProvider { @VisibleForTesting - static final Logger LOG = LoggerFactory.getLogger(ObserverReadProxyProvider.class); + static final Logger LOG = LoggerFactory.getLogger(RouterObserverReadProxyProvider.class); /** Client-side context for syncing with the NameNode server side. */ private final AlignmentContext alignmentContext; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 0ff166e0c3..2f8beb20f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -41,6 +44,7 @@ import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; @@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.test.GenericTestUtils; @@ -72,6 +77,10 @@ public class TestObserverWithRouter { private RouterContext routerContext; private FileSystem fileSystem; + private static final String ROUTER_NS_ID = "router-service"; + private static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = + "dfs.client.failover.observer.auto-msync-period"; + @BeforeEach void init(TestInfo info) throws Exception { if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) { @@ -146,7 +155,8 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th public enum ConfigSetting { USE_NAMENODE_PROXY_FLAG, - USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER + USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER, + USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER } private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) { @@ -162,6 +172,16 @@ private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) .getRpcServerAddress() .getHostName(), RouterObserverReadProxyProvider.class.getName()); break; + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // HA configs + conf.set(DFS_NAMESERVICES, ROUTER_NS_ID); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + ROUTER_NS_ID, "router1"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ "." + ROUTER_NS_ID + ".router1", + routerContext.getFileSystemURI().toString()); + DistributedFileSystem.setDefaultUri(conf, "hdfs://" + ROUTER_NS_ID); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ROUTER_NS_ID, + RouterObserverReadConfiguredFailoverProxyProvider.class.getName()); + break; default: Assertions.fail("Unknown config setting: " + configSetting); } @@ -755,8 +775,10 @@ public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSett @ParameterizedTest public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception { Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); - clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + - routerContext.getRouter().getRpcServerAddress().getHostName(), 0); + String configKeySuffix = + configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 0); fileSystem = routerContext.getFileSystem(clientConfiguration); List namenodes = routerContext @@ -790,6 +812,7 @@ public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exceptio assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver); break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: // An msync is sent to each active namenode for each read. // Total msyncs will be (numListings * num_of_nameservices). assertEquals("Msyncs sent to the active namenodes", @@ -806,8 +829,10 @@ public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exceptio @ParameterizedTest public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception { Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); - clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + - routerContext.getRouter().getRpcServerAddress().getHostName(), 3000); + String configKeySuffix = + configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000); fileSystem = routerContext.getFileSystem(clientConfiguration); List namenodes = routerContext @@ -840,6 +865,7 @@ public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception { assertEquals("Reads sent to observer", 2, rpcCountForObserver); break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: // 4 msyncs expected. 2 for the first read, and 2 for the third read // after the auto-msync period has elapsed during the sleep. assertEquals("Msyncs sent to the active namenodes", @@ -856,8 +882,10 @@ public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception { @ParameterizedTest public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception { Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); - clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + - routerContext.getRouter().getRpcServerAddress().getHostName(), 3000); + String configKeySuffix = + configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000); fileSystem = routerContext.getFileSystem(clientConfiguration); List namenodes = routerContext @@ -890,6 +918,7 @@ public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) t assertEquals("Read sent to observer", 1, rpcCountForObserver); break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: // 5 calls to the active namenodes expected. 4 msync and a mkdir. // Each of the 2 reads results in an msync to 2 nameservices. // The mkdir also goes to the active.