HDFS-14017. [SBN read] ObserverReadProxyProviderWithIPFailover should work with HA configuration. Contributed by Chen Liang.
This commit is contained in:
parent
652b257478
commit
a3aab48df0
@ -192,6 +192,9 @@ public interface HdfsClientConfigKeys {
|
||||
String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
|
||||
"dfs.provided.aliasmap.inmemory.dnrpc-address";
|
||||
|
||||
String DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS =
|
||||
Failover.PREFIX + "ipfailover.virtual-address";
|
||||
|
||||
/**
|
||||
* These are deprecated config keys to client code.
|
||||
*/
|
||||
|
@ -17,24 +17,99 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS;
|
||||
|
||||
/**
|
||||
* ObserverReadProxyProvider with IPFailoverProxyProvider
|
||||
* as the failover method.
|
||||
* Extends {@link ObserverReadProxyProvider} to support NameNode IP failover.
|
||||
*
|
||||
* For Observer reads a client needs to know physical addresses of all
|
||||
* NameNodes, so that it could switch between active and observer nodes
|
||||
* for write and read requests.
|
||||
*
|
||||
* Traditional {@link IPFailoverProxyProvider} works with a virtual
|
||||
* address of the NameNode. If active NameNode fails the virtual address
|
||||
* is assigned to the standby NameNode, and IPFailoverProxyProvider, which
|
||||
* keeps talking to the same virtual address is in fact now connects to
|
||||
* the new physical server.
|
||||
*
|
||||
* To combine these behaviors ObserverReadProxyProviderWithIPFailover
|
||||
* should both
|
||||
* <ol>
|
||||
* <li> Maintain all physical addresses of NameNodes in order to allow
|
||||
* observer reads, and
|
||||
* <li> Should rely on the virtual address of the NameNode in order to
|
||||
* perform failover by assuming that the virtual address always points
|
||||
* to the active NameNode.
|
||||
* </ol>
|
||||
*
|
||||
* An example of a configuration to leverage
|
||||
* ObserverReadProxyProviderWithIPFailover
|
||||
* should include the following values:
|
||||
* <pre>{@code
|
||||
* fs.defaultFS = hdfs://mycluster
|
||||
* dfs.nameservices = mycluster
|
||||
* dfs.ha.namenodes.mycluster = ha1,ha2
|
||||
* dfs.namenode.rpc-address.mycluster.ha1 = nn01-ha1.com:8020
|
||||
* dfs.namenode.rpc-address.mycluster.ha2 = nn01-ha2.com:8020
|
||||
* dfs.client.failover.ipfailover.virtual-address.mycluster = nn01.com:8020
|
||||
* dfs.client.failover.proxy.provider.mycluster =
|
||||
* org.apache...ObserverReadProxyProviderWithIPFailover
|
||||
* }</pre>
|
||||
* Here {@code nn01.com:8020} is the virtual address of the active NameNode,
|
||||
* while {@code nn01-ha1.com:8020} and {@code nn01-ha2.com:8020}
|
||||
* are the physically addresses the two NameNodes.
|
||||
*
|
||||
* With this configuration, client will use
|
||||
* ObserverReadProxyProviderWithIPFailover, which creates proxies for both
|
||||
* nn01-ha1 and nn01-ha2, used for read/write RPC calls, but for the failover,
|
||||
* it relies on the virtual address nn01.com
|
||||
*/
|
||||
public class
|
||||
ObserverReadProxyProviderWithIPFailover<T extends ClientProtocol>
|
||||
extends ObserverReadProxyProvider<T> {
|
||||
|
||||
public class ObserverReadProxyProviderWithIPFailover<T extends ClientProtocol>
|
||||
extends ObserverReadProxyProvider<T> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
ObserverReadProxyProviderWithIPFailover.class);
|
||||
|
||||
/**
|
||||
* By default ObserverReadProxyProviderWithIPFailover
|
||||
* uses {@link IPFailoverProxyProvider} for failover.
|
||||
*/
|
||||
public ObserverReadProxyProviderWithIPFailover(
|
||||
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
|
||||
this(conf, uri, xface, factory,
|
||||
new IPFailoverProxyProvider<>(conf,
|
||||
getFailoverVirtualIP(conf, uri.getHost()), xface, factory));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useLogicalURI() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public ObserverReadProxyProviderWithIPFailover(
|
||||
Configuration conf, URI uri, Class<T> xface,
|
||||
HAProxyFactory<T> factory) throws IOException {
|
||||
super(conf, uri, xface, factory,
|
||||
new IPFailoverProxyProvider<T>(conf, uri, xface,factory));
|
||||
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
|
||||
AbstractNNFailoverProxyProvider<T> failoverProxy) {
|
||||
super(conf, uri, xface, factory, failoverProxy);
|
||||
}
|
||||
}
|
||||
|
||||
private static URI getFailoverVirtualIP(
|
||||
Configuration conf, String nameServiceID) {
|
||||
String configKey = DFS_CLIENT_FAILOVER_IPFAILOVER_VIRTUAL_ADDRESS
|
||||
+ "." + nameServiceID;
|
||||
String virtualIP = conf.get(configKey);
|
||||
LOG.info("Name service ID {} will use virtual IP {} for failover",
|
||||
nameServiceID, virtualIP);
|
||||
if (virtualIP == null) {
|
||||
throw new IllegalArgumentException("Virtual IP for failover not found,"
|
||||
+ "misconfigured " + configKey + "?");
|
||||
}
|
||||
return URI.create(virtualIP);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user