HDFS-11538. Move ClientProtocol HA proxies into hadoop-hdfs-client. Contributed by Huafeng Wang.

This commit is contained in:
Andrew Wang 2017-04-04 23:05:24 -07:00
parent 19b89c4c7b
commit 9e0e430f18
19 changed files with 311 additions and 229 deletions

View File

@ -169,6 +169,19 @@ static String addSuffix(String key, String suffix) {
return key + "." + suffix; return key + "." + suffix;
} }
/**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return DFSUtilClient.getAddresses(conf, null,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/** /**
* Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
* the configuration. * the configuration.

View File

@ -20,15 +20,29 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import static org.apache.hadoop.security.SecurityUtil.buildTokenService;
@InterfaceAudience.Private @InterfaceAudience.Private
public class HAUtilClient { public class HAUtilClient {
private static final Logger LOG = LoggerFactory.getLogger(HAUtilClient.class);
private static final DelegationTokenSelector tokenSelector =
new DelegationTokenSelector();
/** /**
* @return true if the given nameNodeUri appears to be a logical URI. * @return true if the given nameNodeUri appears to be a logical URI.
*/ */
@ -92,4 +106,45 @@ public static URI getServiceUriFromToken(final String scheme, Token<?> token) {
public static boolean isTokenForLogicalUri(Token<?> token) { public static boolean isTokenForLogicalUri(Token<?> token) {
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
} }
/**
* Locate a delegation token associated with the given HA cluster URI, and if
* one is found, clone it to also represent the underlying namenode address.
* @param ugi the UGI to modify
* @param haUri the logical URI for the cluster
* @param nnAddrs collection of NNs in the cluster to which the token
* applies
*/
public static void cloneDelegationTokenForLogicalUri(
UserGroupInformation ugi, URI haUri,
Collection<InetSocketAddress> nnAddrs) {
// this cloning logic is only used by hdfs
Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
HdfsConstants.HDFS_URI_SCHEME);
Token<DelegationTokenIdentifier> haToken =
tokenSelector.selectToken(haService, ugi.getTokens());
if (haToken != null) {
for (InetSocketAddress singleNNAddr : nnAddrs) {
// this is a minor hack to prevent physical HA tokens from being
// exposed to the user via UGI.getCredentials(), otherwise these
// cloned tokens may be inadvertently propagated to jobs
Token<DelegationTokenIdentifier> specificToken =
haToken.privateClone(buildTokenService(singleNNAddr));
Text alias = new Text(
HAUtilClient.buildTokenServicePrefixForLogicalUri(
HdfsConstants.HDFS_URI_SCHEME)
+ "//" + specificToken.getService());
ugi.addToken(alias, specificToken);
if (LOG.isDebugEnabled()) {
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No HA service delegation token found for logical URI " +
haUri);
}
}
}
} }

View File

@ -28,6 +28,8 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -212,6 +214,14 @@ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth) throws IOException {
return createFailoverProxyProvider(conf, nameNodeUri, xface, checkPort,
fallbackToSimpleAuth, new ClientHAProxyFactory<T>());
}
protected static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth, HAProxyFactory<T> proxyFactory)
throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
AbstractNNFailoverProxyProvider<T> providerNN; AbstractNNFailoverProxyProvider<T> providerNN;
try { try {
@ -223,9 +233,10 @@ public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider
} }
// Create a proxy provider instance. // Create a proxy provider instance.
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class); .getConstructor(Configuration.class, URI.class,
Class.class, HAProxyFactory.class);
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
xface); xface, proxyFactory);
// If the proxy provider is of an old implementation, wrap it. // If the proxy provider is of an old implementation, wrap it.
if (!(provider instanceof AbstractNNFailoverProxyProvider)) { if (!(provider instanceof AbstractNNFailoverProxyProvider)) {

View File

@ -67,6 +67,7 @@ public interface HdfsClientConfigKeys {
String PREFIX = "dfs.client."; String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices"; String DFS_NAMESERVICES = "dfs.nameservices";
String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
public class ClientHAProxyFactory<T> implements HAProxyFactory<T> {
@Override
@SuppressWarnings("unchecked")
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
nnAddr, conf, ugi, false, fallbackToSimpleAuth);
}
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries)
throws IOException {
return createProxy(conf, nnAddr, xface, ugi, withRetries, null);
}
}

View File

@ -26,22 +26,16 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting; import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/** /**
* A FailoverProxyProvider implementation which allows one to configure * A FailoverProxyProvider implementation which allows one to configure
@ -51,25 +45,9 @@
*/ */
public class ConfiguredFailoverProxyProvider<T> extends public class ConfiguredFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> { AbstractNNFailoverProxyProvider<T> {
private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
interface ProxyFactory<T> {
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
}
static class DefaultProxyFactory<T> implements ProxyFactory<T> { private static final Logger LOG =
@Override LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class);
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf,
nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy();
}
}
protected final Configuration conf; protected final Configuration conf;
protected final List<AddressRpcProxyPair<T>> proxies = protected final List<AddressRpcProxyPair<T>> proxies =
@ -78,22 +56,11 @@ public T createProxy(Configuration conf, InetSocketAddress nnAddr,
protected final Class<T> xface; protected final Class<T> xface;
private int currentProxyIndex = 0; private int currentProxyIndex = 0;
private final ProxyFactory<T> factory; private final HAProxyFactory<T> factory;
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) { Class<T> xface, HAProxyFactory<T> factory) {
this(conf, uri, xface, new DefaultProxyFactory<T>());
}
@VisibleForTesting
ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, ProxyFactory<T> factory) {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
this.xface = xface; this.xface = xface;
this.conf = new Configuration(conf); this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt( int maxRetries = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
@ -101,7 +68,7 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
this.conf.setInt( this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
maxRetries); maxRetries);
int maxRetriesOnSocketTimeouts = this.conf.getInt( int maxRetriesOnSocketTimeouts = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
@ -112,16 +79,16 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
try { try {
ugi = UserGroupInformation.getCurrentUser(); ugi = UserGroupInformation.getCurrentUser();
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses( Map<String, Map<String, InetSocketAddress>> map =
conf); DFSUtilClient.getHaNnRpcAddresses(conf);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost()); Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) { if (addressesInNN == null || addressesInNN.size() == 0) {
throw new RuntimeException("Could not find any configured addresses " + throw new RuntimeException("Could not find any configured addresses " +
"for URI " + uri); "for URI " + uri);
} }
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values(); Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) { for (InetSocketAddress address : addressesOfNns) {
proxies.add(new AddressRpcProxyPair<T>(address)); proxies.add(new AddressRpcProxyPair<T>(address));
@ -137,13 +104,13 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
// The client may have a delegation token set for the logical // The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the // URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it. // underlying IPC addresses so that the IPC code can find it.
HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
this.factory = factory; this.factory = factory;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Override @Override
public Class<T> getInterface() { public Class<T> getInterface() {
return xface; return xface;
@ -183,7 +150,7 @@ synchronized void incrementProxyIndex() {
private static class AddressRpcProxyPair<T> { private static class AddressRpcProxyPair<T> {
public final InetSocketAddress address; public final InetSocketAddress address;
public T namenode; public T namenode;
public AddressRpcProxyPair(InetSocketAddress address) { public AddressRpcProxyPair(InetSocketAddress address) {
this.address = address; this.address = address;
} }

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This interface aims to decouple the proxy creation implementation that used
* in {@link AbstractNNFailoverProxyProvider}. Client side can use
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} to initialize the
* proxy while the server side can use NamenodeProtocols
*/
@InterfaceAudience.Private
public interface HAProxyFactory<T> {
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException;
}

View File

@ -25,14 +25,10 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
/** /**
* A NNFailoverProxyProvider implementation which works on IP failover setup. * A NNFailoverProxyProvider implementation which works on IP failover setup.
* Only one proxy is used to connect to both servers and switching between * Only one proxy is used to connect to both servers and switching between
@ -40,7 +36,7 @@
* clients can consistently reach only one node at a time. * clients can consistently reach only one node at a time.
* *
* Clients with a live connection will likely get connection reset after an * Clients with a live connection will likely get connection reset after an
* IP failover. This case will be handled by the * IP failover. This case will be handled by the
* FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is
* not idempotent, it won't get retried. * not idempotent, it won't get retried.
* *
@ -54,15 +50,14 @@ public class IPFailoverProxyProvider<T> extends
private final Configuration conf; private final Configuration conf;
private final Class<T> xface; private final Class<T> xface;
private final URI nameNodeUri; private final URI nameNodeUri;
private final HAProxyFactory<T> factory;
private ProxyInfo<T> nnProxyInfo = null; private ProxyInfo<T> nnProxyInfo = null;
public IPFailoverProxyProvider(Configuration conf, URI uri, public IPFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) { Class<T> xface, HAProxyFactory<T> factory) {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
this.xface = xface; this.xface = xface;
this.nameNodeUri = uri; this.nameNodeUri = uri;
this.factory = factory;
this.conf = new Configuration(conf); this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt( int maxRetries = this.conf.getInt(
@ -71,7 +66,7 @@ public IPFailoverProxyProvider(Configuration conf, URI uri,
this.conf.setInt( this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
maxRetries); maxRetries);
int maxRetriesOnSocketTimeouts = this.conf.getInt( int maxRetriesOnSocketTimeouts = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
@ -79,7 +74,7 @@ public IPFailoverProxyProvider(Configuration conf, URI uri,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
maxRetriesOnSocketTimeouts); maxRetriesOnSocketTimeouts);
} }
@Override @Override
public Class<T> getInterface() { public Class<T> getInterface() {
return xface; return xface;
@ -92,9 +87,8 @@ public synchronized ProxyInfo<T> getProxy() {
try { try {
// Create a proxy that is not wrapped in RetryProxy // Create a proxy that is not wrapped in RetryProxy
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
nnProxyInfo = new ProxyInfo<T>(NameNodeProxies.createNonHAProxy( nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface,
conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), UserGroupInformation.getCurrentUser(), false), nnAddr.toString());
false).getProxy(), nnAddr.toString());
} catch (IOException ioe) { } catch (IOException ioe) {
throw new RuntimeException(ioe); throw new RuntimeException(ioe);
} }

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.MultiException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -147,15 +146,9 @@ public Object call() throws Exception {
private volatile ProxyInfo<T> successfulProxy = null; private volatile ProxyInfo<T> successfulProxy = null;
private volatile String toIgnore = null; private volatile String toIgnore = null;
public RequestHedgingProxyProvider( public RequestHedgingProxyProvider(Configuration conf, URI uri,
Configuration conf, URI uri, Class<T> xface) { Class<T> xface, HAProxyFactory<T> proxyFactory) {
this(conf, uri, xface, new DefaultProxyFactory<T>()); super(conf, uri, xface, proxyFactory);
}
@VisibleForTesting
RequestHedgingProxyProvider(Configuration conf, URI uri,
Class<T> xface, ProxyFactory<T> factory) {
super(conf, uri, xface, factory);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -29,9 +29,8 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
@ -66,20 +65,20 @@ public void setup() throws URISyntaxException {
ns = "mycluster-" + Time.monotonicNow(); ns = "mycluster-" + Time.monotonicNow();
nnUri = new URI("hdfs://" + ns); nnUri = new URI("hdfs://" + ns);
conf = new Configuration(); conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns); conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
conf.set( conf.set(
DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
conf.set( conf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
"machine1.foo.bar:9820"); "machine1.foo.bar:9820");
conf.set( conf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
"machine2.foo.bar:9820"); "machine2.foo.bar:9820");
} }
@Test @Test
public void testHedgingWhenOneFails() throws Exception { public void testHedgingWhenOneFails() throws Exception {
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -87,11 +86,11 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
return new long[]{1}; return new long[]{1};
} }
}); });
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(badMock, goodMock)); createFactory(badMock, goodMock));
long[] stats = provider.getProxy().proxy.getStats(); long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1); Assert.assertTrue(stats.length == 1);
@ -101,7 +100,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test @Test
public void testHedgingWhenOneIsSlow() throws Exception { public void testHedgingWhenOneIsSlow() throws Exception {
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -109,11 +108,11 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
return new long[]{1}; return new long[]{1};
} }
}); });
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(goodMock, badMock)); createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats(); long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1); Assert.assertTrue(stats.length == 1);
@ -124,14 +123,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test @Test
public void testHedgingWhenBothFail() throws Exception { public void testHedgingWhenBothFail() throws Exception {
NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); ClientProtocol worseMock = Mockito.mock(ClientProtocol.class);
Mockito.when(worseMock.getStats()).thenThrow( Mockito.when(worseMock.getStats()).thenThrow(
new IOException("Worse mock !!")); new IOException("Worse mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(badMock, worseMock)); createFactory(badMock, worseMock));
try { try {
provider.getProxy().proxy.getStats(); provider.getProxy().proxy.getStats();
@ -147,7 +146,7 @@ public void testHedgingWhenBothFail() throws Exception {
public void testPerformFailover() throws Exception { public void testPerformFailover() throws Exception {
final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger counter = new AtomicInteger(0);
final int[] isGood = {1}; final int[] isGood = {1};
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -159,7 +158,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Was Good mock !!"); throw new IOException("Was Good mock !!");
} }
}); });
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -171,8 +170,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Bad mock !!"); throw new IOException("Bad mock !!");
} }
}); });
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(goodMock, badMock)); createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats(); long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1); Assert.assertTrue(stats.length == 1);
@ -234,14 +233,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test @Test
public void testPerformFailoverWith3Proxies() throws Exception { public void testPerformFailoverWith3Proxies() throws Exception {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
"nn1,nn2,nn3"); "nn1,nn2,nn3");
conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
"machine3.foo.bar:9820"); "machine3.foo.bar:9820");
final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger counter = new AtomicInteger(0);
final int[] isGood = {1}; final int[] isGood = {1};
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -253,7 +252,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Was Good mock !!"); throw new IOException("Was Good mock !!");
} }
}); });
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -265,7 +264,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Bad mock !!"); throw new IOException("Bad mock !!");
} }
}); });
final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); final ClientProtocol worseMock = Mockito.mock(ClientProtocol.class);
Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() { Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override @Override
public long[] answer(InvocationOnMock invocation) throws Throwable { public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -278,8 +277,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
} }
}); });
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(goodMock, badMock, worseMock)); createFactory(goodMock, badMock, worseMock));
long[] stats = provider.getProxy().proxy.getStats(); long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1); Assert.assertTrue(stats.length == 1);
@ -355,14 +354,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test @Test
public void testHedgingWhenFileNotFoundException() throws Exception { public void testHedgingWhenFileNotFoundException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito Mockito
.when(active.getBlockLocations(Matchers.anyString(), .when(active.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong())) Matchers.anyLong(), Matchers.anyLong()))
.thenThrow(new RemoteException("java.io.FileNotFoundException", .thenThrow(new RemoteException("java.io.FileNotFoundException",
"File does not exist!")); "File does not exist!"));
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito Mockito
.when(standby.getBlockLocations(Matchers.anyString(), .when(standby.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong())) Matchers.anyLong(), Matchers.anyLong()))
@ -370,9 +369,9 @@ public void testHedgingWhenFileNotFoundException() throws Exception {
new RemoteException("org.apache.hadoop.ipc.StandbyException", new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode")); "Standby NameNode"));
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby)); ClientProtocol.class, createFactory(active, standby));
try { try {
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
Assert.fail("Should fail since the active namenode throws" Assert.fail("Should fail since the active namenode throws"
@ -394,18 +393,18 @@ public void testHedgingWhenFileNotFoundException() throws Exception {
@Test @Test
public void testHedgingWhenConnectException() throws Exception { public void testHedgingWhenConnectException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito.when(active.getStats()).thenThrow(new ConnectException()); Mockito.when(active.getStats()).thenThrow(new ConnectException());
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito.when(standby.getStats()) Mockito.when(standby.getStats())
.thenThrow( .thenThrow(
new RemoteException("org.apache.hadoop.ipc.StandbyException", new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode")); "Standby NameNode"));
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby)); ClientProtocol.class, createFactory(active, standby));
try { try {
provider.getProxy().proxy.getStats(); provider.getProxy().proxy.getStats();
Assert.fail("Should fail since the active namenode throws" Assert.fail("Should fail since the active namenode throws"
@ -428,15 +427,15 @@ public void testHedgingWhenConnectException() throws Exception {
@Test @Test
public void testHedgingWhenConnectAndEOFException() throws Exception { public void testHedgingWhenConnectAndEOFException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito.when(active.getStats()).thenThrow(new EOFException()); Mockito.when(active.getStats()).thenThrow(new EOFException());
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito.when(standby.getStats()).thenThrow(new ConnectException()); Mockito.when(standby.getStats()).thenThrow(new ConnectException());
RequestHedgingProxyProvider<NamenodeProtocols> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby)); ClientProtocol.class, createFactory(active, standby));
try { try {
provider.getProxy().proxy.getStats(); provider.getProxy().proxy.getStats();
Assert.fail("Should fail since both active and standby namenodes throw" Assert.fail("Should fail since both active and standby namenodes throw"
@ -453,18 +452,25 @@ public void testHedgingWhenConnectAndEOFException() throws Exception {
Mockito.verify(standby).getStats(); Mockito.verify(standby).getStats();
} }
private ProxyFactory<NamenodeProtocols> createFactory( private HAProxyFactory<ClientProtocol> createFactory(
NamenodeProtocols... protos) { ClientProtocol... protos) {
final Iterator<NamenodeProtocols> iterator = final Iterator<ClientProtocol> iterator =
Lists.newArrayList(protos).iterator(); Lists.newArrayList(protos).iterator();
return new ProxyFactory<NamenodeProtocols>() { return new HAProxyFactory<ClientProtocol>() {
@Override @Override
public NamenodeProtocols createProxy(Configuration conf, public ClientProtocol createProxy(Configuration conf,
InetSocketAddress nnAddr, Class<NamenodeProtocols> xface, InetSocketAddress nnAddr, Class<ClientProtocol> xface,
UserGroupInformation ugi, boolean withRetries, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException { AtomicBoolean fallbackToSimpleAuth) throws IOException {
return iterator.next(); return iterator.next();
} }
@Override
public ClientProtocol createProxy(Configuration conf,
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
return iterator.next();
}
}; };
} }
} }

View File

@ -142,7 +142,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host"; public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host";
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; public static final String DFS_NAMENODE_RPC_ADDRESS_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host"; public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host";
public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address"; public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address";
public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host"; public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host";

View File

@ -449,19 +449,6 @@ public static Set<String> getAllNnPrincipals(Configuration conf) throws IOExcept
return principals; return principals;
} }
/**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return DFSUtilClient.getAddresses(conf, null,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/** /**
* Returns list of InetSocketAddress corresponding to backup node rpc * Returns list of InetSocketAddress corresponding to backup node rpc
* addresses from the configuration. * addresses from the configuration.
@ -693,7 +680,7 @@ public static String addressMapToString(
public static String nnAddressesAsString(Configuration conf) { public static String nnAddressesAsString(Configuration conf) {
Map<String, Map<String, InetSocketAddress>> addresses = Map<String, Map<String, InetSocketAddress>> addresses =
getHaNnRpcAddresses(conf); DFSUtilClient.getHaNnRpcAddresses(conf);
return addressMapToString(addresses); return addressMapToString(addresses);
} }

View File

@ -29,7 +29,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.security.SecurityUtil.buildTokenService;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -39,8 +38,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -48,17 +45,12 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -67,12 +59,6 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class HAUtil { public class HAUtil {
private static final Log LOG =
LogFactory.getLog(HAUtil.class);
private static final DelegationTokenSelector tokenSelector =
new DelegationTokenSelector();
private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String[]{ private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String[]{
DFS_NAMENODE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_BIND_HOST_KEY, DFS_NAMENODE_RPC_BIND_HOST_KEY,
@ -97,7 +83,7 @@ private HAUtil() { /* Hidden constructor */ }
*/ */
public static boolean isHAEnabled(Configuration conf, String nsId) { public static boolean isHAEnabled(Configuration conf, String nsId) {
Map<String, Map<String, InetSocketAddress>> addresses = Map<String, Map<String, InetSocketAddress>> addresses =
DFSUtil.getHaNnRpcAddresses(conf); DFSUtilClient.getHaNnRpcAddresses(conf);
if (addresses == null) return false; if (addresses == null) return false;
Map<String, InetSocketAddress> nnMap = addresses.get(nsId); Map<String, InetSocketAddress> nnMap = addresses.get(nsId);
return nnMap != null && nnMap.size() > 1; return nnMap != null && nnMap.size() > 1;
@ -259,47 +245,6 @@ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri)
return provider.useLogicalURI(); return provider.useLogicalURI();
} }
/**
* Locate a delegation token associated with the given HA cluster URI, and if
* one is found, clone it to also represent the underlying namenode address.
* @param ugi the UGI to modify
* @param haUri the logical URI for the cluster
* @param nnAddrs collection of NNs in the cluster to which the token
* applies
*/
public static void cloneDelegationTokenForLogicalUri(
UserGroupInformation ugi, URI haUri,
Collection<InetSocketAddress> nnAddrs) {
// this cloning logic is only used by hdfs
Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
HdfsConstants.HDFS_URI_SCHEME);
Token<DelegationTokenIdentifier> haToken =
tokenSelector.selectToken(haService, ugi.getTokens());
if (haToken != null) {
for (InetSocketAddress singleNNAddr : nnAddrs) {
// this is a minor hack to prevent physical HA tokens from being
// exposed to the user via UGI.getCredentials(), otherwise these
// cloned tokens may be inadvertently propagated to jobs
Token<DelegationTokenIdentifier> specificToken =
haToken.privateClone(buildTokenService(singleNNAddr));
Text alias = new Text(
HAUtilClient.buildTokenServicePrefixForLogicalUri(
HdfsConstants.HDFS_URI_SCHEME)
+ "//" + specificToken.getService());
ugi.addToken(alias, specificToken);
if (LOG.isDebugEnabled()) {
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No HA service delegation token found for logical URI " +
haUri);
}
}
}
/** /**
* Get the internet address of the currently-active NN. This should rarely be * Get the internet address of the currently-active NN. This should rarely be
* used, since callers of this method who connect directly to the NN using the * used, since callers of this method who connect directly to the NN using the

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -112,7 +113,7 @@ public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
throws IOException { throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider = AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri, NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri,
xface, true, fallbackToSimpleAuth); xface, true, fallbackToSimpleAuth, new NameNodeHAProxyFactory<T>());
if (failoverProxyProvider == null) { if (failoverProxyProvider == null) {
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),

View File

@ -17,19 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -63,25 +57,6 @@ protected void writeXml(Exception except, String path, XMLOutputter doc)
doc.endTag(); doc.endTag();
} }
/**
* Create a {@link NameNode} proxy from the current {@link ServletContext}.
*/
protected ClientProtocol createNameNodeProxy() throws IOException {
ServletContext context = getServletContext();
// if we are running in the Name Node, use it directly rather than via
// rpc
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
if (nn != null) {
return nn.getRpcServer();
}
InetSocketAddress nnAddr =
NameNodeHttpServer.getNameNodeAddressFromContext(context);
Configuration conf = new HdfsConfiguration(
NameNodeHttpServer.getConfFromContext(context));
return NameNodeProxies.createProxy(conf, DFSUtilClient.getNNUri(nnAddr),
ClientProtocol.class).getProxy();
}
protected UserGroupInformation getUGI(HttpServletRequest request, protected UserGroupInformation getUGI(HttpServletRequest request,
Configuration conf) throws IOException { Configuration conf) throws IOException {
return JspHelper.getUGI(getServletContext(), request, conf); return JspHelper.getUGI(getServletContext(), request, conf);

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries, fallbackToSimpleAuth).getProxy();
}
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries)
throws IOException {
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries).getProxy();
}
}

View File

@ -42,10 +42,10 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider;
@ -333,7 +333,7 @@ public static class DummyLegacyFailoverProxyProvider<T>
private Class<T> xface; private Class<T> xface;
private T proxy; private T proxy;
public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri, public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) { Class<T> xface, HAProxyFactory<T> proxyFactory) {
try { try {
this.proxy = NameNodeProxies.createNonHAProxy(conf, this.proxy = NameNodeProxies.createNonHAProxy(conf,
DFSUtilClient.getNNAddress(uri), xface, DFSUtilClient.getNNAddress(uri), xface,

View File

@ -513,7 +513,7 @@ public void testHANameNodesWithFederation() throws URISyntaxException {
NS2_NN2_HOST); NS2_NN2_HOST);
Map<String, Map<String, InetSocketAddress>> map = Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getHaNnRpcAddresses(conf); DFSUtilClient.getHaNnRpcAddresses(conf);
assertTrue(HAUtil.isHAEnabled(conf, "ns1")); assertTrue(HAUtil.isHAEnabled(conf, "ns1"));
assertTrue(HAUtil.isHAEnabled(conf, "ns2")); assertTrue(HAUtil.isHAEnabled(conf, "ns2"));

View File

@ -292,7 +292,7 @@ public void testHAUtilClonesDelegationTokens() throws Exception {
nn0.getNameNodeAddress().getPort())); nn0.getNameNodeAddress().getPort()));
nnAddrs.add(new InetSocketAddress("localhost", nnAddrs.add(new InetSocketAddress("localhost",
nn1.getNameNodeAddress().getPort())); nn1.getNameNodeAddress().getPort()));
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens(); Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
assertEquals(3, tokens.size()); assertEquals(3, tokens.size());
@ -321,7 +321,7 @@ public void testHAUtilClonesDelegationTokens() throws Exception {
} }
// reclone the tokens, and see if they match now // reclone the tokens, and see if they match now
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
for (InetSocketAddress addr : nnAddrs) { for (InetSocketAddress addr : nnAddrs) {
Text ipcDtService = SecurityUtil.buildTokenService(addr); Text ipcDtService = SecurityUtil.buildTokenService(addr);
Token<DelegationTokenIdentifier> token2 = Token<DelegationTokenIdentifier> token2 =