HDFS-13848. Refactor NameNode failover proxy providers. Contributed by Konstantin Shvachko.
This commit is contained in:
parent
a5eba25506
commit
a4121c71c2
@ -30,27 +30,30 @@
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public interface FailoverProxyProvider<T> extends Closeable {
|
||||
public static final class ProxyInfo<T> {
|
||||
public final T proxy;
|
||||
static class ProxyInfo<T> {
|
||||
public T proxy;
|
||||
/*
|
||||
* The information (e.g., the IP address) of the current proxy object. It
|
||||
* provides information for debugging purposes.
|
||||
*/
|
||||
public final String proxyInfo;
|
||||
public String proxyInfo;
|
||||
|
||||
public ProxyInfo(T proxy, String proxyInfo) {
|
||||
this.proxy = proxy;
|
||||
this.proxyInfo = proxyInfo;
|
||||
}
|
||||
|
||||
private String proxyName() {
|
||||
return proxy != null ? proxy.getClass().getSimpleName() : "UnknownProxy";
|
||||
}
|
||||
|
||||
public String getString(String methodName) {
|
||||
return proxy.getClass().getSimpleName() + "." + methodName
|
||||
+ " over " + proxyInfo;
|
||||
return proxyName() + "." + methodName + " over " + proxyInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return proxy.getClass().getSimpleName() + " over " + proxyInfo;
|
||||
return proxyName() + " over " + proxyInfo;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,14 +18,68 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||
FailoverProxyProvider <T> {
|
||||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractNNFailoverProxyProvider.class);
|
||||
|
||||
private AtomicBoolean fallbackToSimpleAuth;
|
||||
protected Configuration conf;
|
||||
protected Class<T> xface;
|
||||
protected HAProxyFactory<T> factory;
|
||||
protected UserGroupInformation ugi;
|
||||
protected AtomicBoolean fallbackToSimpleAuth;
|
||||
|
||||
protected AbstractNNFailoverProxyProvider() {
|
||||
}
|
||||
|
||||
protected AbstractNNFailoverProxyProvider(Configuration conf, URI uri,
|
||||
Class<T> xface, HAProxyFactory<T> factory) {
|
||||
this.conf = new Configuration(conf);
|
||||
this.xface = xface;
|
||||
this.factory = factory;
|
||||
try {
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
int maxRetries = this.conf.getInt(
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
maxRetries);
|
||||
|
||||
int maxRetriesOnSocketTimeouts = this.conf.getInt(
|
||||
HdfsClientConfigKeys
|
||||
.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
HdfsClientConfigKeys
|
||||
.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic
|
||||
.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
maxRetriesOnSocketTimeouts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inquire whether logical HA URI is used for the implementation. If it is
|
||||
@ -51,4 +105,100 @@ public synchronized void setFallbackToSimpleAuth(
|
||||
public synchronized AtomicBoolean getFallbackToSimpleAuth() {
|
||||
return fallbackToSimpleAuth;
|
||||
}
|
||||
|
||||
/**
|
||||
* ProxyInfo to a NameNode. Includes its address.
|
||||
*/
|
||||
public static class NNProxyInfo<T> extends ProxyInfo<T> {
|
||||
private InetSocketAddress address;
|
||||
|
||||
public NNProxyInfo(InetSocketAddress address) {
|
||||
super(null, address.toString());
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
public InetSocketAddress getAddress() {
|
||||
return address;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<T> getInterface() {
|
||||
return xface;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a proxy if it has not been created yet.
|
||||
*/
|
||||
protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
|
||||
if (pi.proxy == null) {
|
||||
assert pi.getAddress() != null : "Proxy address is null";
|
||||
try {
|
||||
pi.proxy = factory.createProxy(conf,
|
||||
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("{} Failed to create RPC proxy to NameNode",
|
||||
this.getClass().getSimpleName(), ioe);
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
}
|
||||
return pi;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of configured NameNode proxy addresses.
|
||||
* Randomize the list if requested.
|
||||
*/
|
||||
protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
|
||||
final List<NNProxyInfo<T>> proxies = new ArrayList<NNProxyInfo<T>>();
|
||||
Map<String, Map<String, InetSocketAddress>> map =
|
||||
DFSUtilClient.getAddresses(conf, null, addressKey);
|
||||
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
|
||||
|
||||
if (addressesInNN == null || addressesInNN.size() == 0) {
|
||||
throw new RuntimeException("Could not find any configured addresses " +
|
||||
"for URI " + uri);
|
||||
}
|
||||
|
||||
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
|
||||
for (InetSocketAddress address : addressesOfNns) {
|
||||
proxies.add(new NNProxyInfo<T>(address));
|
||||
}
|
||||
// Randomize the list to prevent all clients pointing to the same one
|
||||
boolean randomized = getRandomOrder(conf, uri);
|
||||
if (randomized) {
|
||||
Collections.shuffle(proxies);
|
||||
}
|
||||
|
||||
// The client may have a delegation token set for the logical
|
||||
// URI of the cluster. Clone this token to apply to each of the
|
||||
// underlying IPC addresses so that the IPC code can find it.
|
||||
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
|
||||
return proxies;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether random order is configured for failover proxy provider
|
||||
* for the namenode/nameservice.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nameNodeUri The URI of namenode/nameservice
|
||||
* @return random order configuration
|
||||
*/
|
||||
public static boolean getRandomOrder(
|
||||
Configuration conf, URI nameNodeUri) {
|
||||
String host = nameNodeUri.getHost();
|
||||
String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER
|
||||
+ "." + host;
|
||||
|
||||
if (conf.get(configKeyWithHost) != null) {
|
||||
return conf.getBoolean(
|
||||
configKeyWithHost,
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
|
||||
}
|
||||
|
||||
return conf.getBoolean(
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER,
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
|
||||
}
|
||||
}
|
||||
|
@ -19,23 +19,11 @@
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
|
||||
@ -48,17 +36,9 @@
|
||||
public class ConfiguredFailoverProxyProvider<T> extends
|
||||
AbstractNNFailoverProxyProvider<T> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class);
|
||||
|
||||
protected final Configuration conf;
|
||||
protected final List<AddressRpcProxyPair<T>> proxies =
|
||||
new ArrayList<AddressRpcProxyPair<T>>();
|
||||
protected final UserGroupInformation ugi;
|
||||
protected final Class<T> xface;
|
||||
protected final List<NNProxyInfo<T>> proxies;
|
||||
|
||||
private int currentProxyIndex = 0;
|
||||
protected final HAProxyFactory<T> factory;
|
||||
|
||||
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
|
||||
Class<T> xface, HAProxyFactory<T> factory) {
|
||||
@ -67,83 +47,8 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
|
||||
|
||||
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
|
||||
Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
|
||||
this.xface = xface;
|
||||
this.conf = new Configuration(conf);
|
||||
int maxRetries = this.conf.getInt(
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
maxRetries);
|
||||
|
||||
int maxRetriesOnSocketTimeouts = this.conf.getInt(
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic
|
||||
.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
maxRetriesOnSocketTimeouts);
|
||||
|
||||
try {
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
|
||||
Map<String, Map<String, InetSocketAddress>> map =
|
||||
DFSUtilClient.getAddresses(conf, null, addressKey);
|
||||
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
|
||||
|
||||
if (addressesInNN == null || addressesInNN.size() == 0) {
|
||||
throw new RuntimeException("Could not find any configured addresses " +
|
||||
"for URI " + uri);
|
||||
}
|
||||
|
||||
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
|
||||
for (InetSocketAddress address : addressesOfNns) {
|
||||
proxies.add(new AddressRpcProxyPair<T>(address));
|
||||
}
|
||||
// Randomize the list to prevent all clients pointing to the same one
|
||||
boolean randomized = getRandomOrder(conf, uri);
|
||||
if (randomized) {
|
||||
Collections.shuffle(proxies);
|
||||
}
|
||||
|
||||
// The client may have a delegation token set for the logical
|
||||
// URI of the cluster. Clone this token to apply to each of the
|
||||
// underlying IPC addresses so that the IPC code can find it.
|
||||
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
|
||||
this.factory = factory;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether random order is configured for failover proxy provider
|
||||
* for the namenode/nameservice.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nameNodeUri The URI of namenode/nameservice
|
||||
* @return random order configuration
|
||||
*/
|
||||
private static boolean getRandomOrder(
|
||||
Configuration conf, URI nameNodeUri) {
|
||||
String host = nameNodeUri.getHost();
|
||||
String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER
|
||||
+ "." + host;
|
||||
|
||||
if (conf.get(configKeyWithHost) != null) {
|
||||
return conf.getBoolean(
|
||||
configKeyWithHost,
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
|
||||
}
|
||||
|
||||
return conf.getBoolean(
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER,
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<T> getInterface() {
|
||||
return xface;
|
||||
super(conf, uri, xface, factory);
|
||||
this.proxies = getProxyAddresses(uri, addressKey);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -151,21 +56,8 @@ public Class<T> getInterface() {
|
||||
*/
|
||||
@Override
|
||||
public synchronized ProxyInfo<T> getProxy() {
|
||||
AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
|
||||
return getProxy(current);
|
||||
}
|
||||
|
||||
protected ProxyInfo<T> getProxy(AddressRpcProxyPair<T> current) {
|
||||
if (current.namenode == null) {
|
||||
try {
|
||||
current.namenode = factory.createProxy(conf,
|
||||
current.address, xface, ugi, false, getFallbackToSimpleAuth());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create RPC proxy to NameNode", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return new ProxyInfo<T>(current.namenode, current.address.toString());
|
||||
NNProxyInfo<T> current = proxies.get(currentProxyIndex);
|
||||
return createProxyIfNeeded(current);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -177,31 +69,18 @@ synchronized void incrementProxyIndex() {
|
||||
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* A little pair object to store the address and connected RPC proxy object to
|
||||
* an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
|
||||
*/
|
||||
protected static class AddressRpcProxyPair<T> {
|
||||
public final InetSocketAddress address;
|
||||
public T namenode;
|
||||
|
||||
public AddressRpcProxyPair(InetSocketAddress address) {
|
||||
this.address = address;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all the proxy objects which have been opened over the lifetime of
|
||||
* this proxy provider.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
for (AddressRpcProxyPair<T> proxy : proxies) {
|
||||
if (proxy.namenode != null) {
|
||||
if (proxy.namenode instanceof Closeable) {
|
||||
((Closeable)proxy.namenode).close();
|
||||
for (ProxyInfo<T> proxy : proxies) {
|
||||
if (proxy.proxy != null) {
|
||||
if (proxy.proxy instanceof Closeable) {
|
||||
((Closeable)proxy.proxy).close();
|
||||
} else {
|
||||
RPC.stopProxy(proxy.namenode);
|
||||
RPC.stopProxy(proxy.proxy);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,15 +19,11 @@
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* A NNFailoverProxyProvider implementation which works on IP failover setup.
|
||||
@ -47,53 +43,18 @@
|
||||
*/
|
||||
public class IPFailoverProxyProvider<T> extends
|
||||
AbstractNNFailoverProxyProvider<T> {
|
||||
private final Configuration conf;
|
||||
private final Class<T> xface;
|
||||
private final URI nameNodeUri;
|
||||
private final HAProxyFactory<T> factory;
|
||||
private ProxyInfo<T> nnProxyInfo = null;
|
||||
private final NNProxyInfo<T> nnProxyInfo;
|
||||
|
||||
public IPFailoverProxyProvider(Configuration conf, URI uri,
|
||||
Class<T> xface, HAProxyFactory<T> factory) {
|
||||
this.xface = xface;
|
||||
this.nameNodeUri = uri;
|
||||
this.factory = factory;
|
||||
|
||||
this.conf = new Configuration(conf);
|
||||
int maxRetries = this.conf.getInt(
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
maxRetries);
|
||||
|
||||
int maxRetriesOnSocketTimeouts = this.conf.getInt(
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
maxRetriesOnSocketTimeouts);
|
||||
super(conf, uri, xface, factory);
|
||||
this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<T> getInterface() {
|
||||
return xface;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ProxyInfo<T> getProxy() {
|
||||
public synchronized NNProxyInfo<T> getProxy() {
|
||||
// Create a non-ha proxy if not already created.
|
||||
if (nnProxyInfo == null) {
|
||||
try {
|
||||
// Create a proxy that is not wrapped in RetryProxy
|
||||
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
|
||||
nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface,
|
||||
UserGroupInformation.getCurrentUser(), false), nnAddr.toString());
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
}
|
||||
return nnProxyInfo;
|
||||
return createProxyIfNeeded(nnProxyInfo);
|
||||
}
|
||||
|
||||
/** Nothing to do for IP failover */
|
||||
@ -106,7 +67,7 @@ public void performFailover(T currentProxy) {
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (nnProxyInfo == null) {
|
||||
if (nnProxyInfo.proxy == null) {
|
||||
return;
|
||||
}
|
||||
if (nnProxyInfo.proxy instanceof Closeable) {
|
||||
|
Loading…
Reference in New Issue
Block a user