diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index f9b2e8d7c8..2e770cc1c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -169,6 +169,19 @@ static String addSuffix(String key, String suffix) { return key + "." + suffix; } + /** + * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from + * the configuration. + * + * @param conf configuration + * @return list of InetSocketAddresses + */ + public static Map> getHaNnRpcAddresses( + Configuration conf) { + return DFSUtilClient.getAddresses(conf, null, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + } + /** * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from * the configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java index 9f28cfcde7..47288f77df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java @@ -20,15 +20,29 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.net.URI; +import java.util.Collection; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; +import static org.apache.hadoop.security.SecurityUtil.buildTokenService; @InterfaceAudience.Private public class HAUtilClient { + private static final Logger LOG = LoggerFactory.getLogger(HAUtilClient.class); + + private static final DelegationTokenSelector tokenSelector = + new DelegationTokenSelector(); + /** * @return true if the given nameNodeUri appears to be a logical URI. */ @@ -92,4 +106,45 @@ public static URI getServiceUriFromToken(final String scheme, Token token) { public static boolean isTokenForLogicalUri(Token token) { return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); } + + /** + * Locate a delegation token associated with the given HA cluster URI, and if + * one is found, clone it to also represent the underlying namenode address. + * @param ugi the UGI to modify + * @param haUri the logical URI for the cluster + * @param nnAddrs collection of NNs in the cluster to which the token + * applies + */ + public static void cloneDelegationTokenForLogicalUri( + UserGroupInformation ugi, URI haUri, + Collection nnAddrs) { + // this cloning logic is only used by hdfs + Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, + HdfsConstants.HDFS_URI_SCHEME); + Token haToken = + tokenSelector.selectToken(haService, ugi.getTokens()); + if (haToken != null) { + for (InetSocketAddress singleNNAddr : nnAddrs) { + // this is a minor hack to prevent physical HA tokens from being + // exposed to the user via UGI.getCredentials(), otherwise these + // cloned tokens may be inadvertently propagated to jobs + Token specificToken = + haToken.privateClone(buildTokenService(singleNNAddr)); + Text alias = new Text( + HAUtilClient.buildTokenServicePrefixForLogicalUri( + HdfsConstants.HDFS_URI_SCHEME) + + "//" + specificToken.getService()); + ugi.addToken(alias, specificToken); + if (LOG.isDebugEnabled()) { + LOG.debug("Mapped HA service delegation token for logical URI " + + haUri + " to namenode " + singleNNAddr); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No HA service delegation token found for logical URI " + + haUri); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 5ca7030acd..a092f02630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +214,14 @@ public static ProxyAndInfo createProxyWithLossyRetryHandler( public static AbstractNNFailoverProxyProvider createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { + return createFailoverProxyProvider(conf, nameNodeUri, xface, checkPort, + fallbackToSimpleAuth, new ClientHAProxyFactory()); + } + + protected static AbstractNNFailoverProxyProvider createFailoverProxyProvider( + Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, + AtomicBoolean fallbackToSimpleAuth, HAProxyFactory proxyFactory) + throws IOException { Class> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider providerNN; try { @@ -223,9 +233,10 @@ public static AbstractNNFailoverProxyProvider createFailoverProxyProvider } // Create a proxy provider instance. Constructor> ctor = failoverProxyProviderClass - .getConstructor(Configuration.class, URI.class, Class.class); + .getConstructor(Configuration.class, URI.class, + Class.class, HAProxyFactory.class); FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, - xface); + xface, proxyFactory); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 1a388061ce..c152a4bf3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -67,6 +67,7 @@ public interface HdfsClientConfigKeys { String PREFIX = "dfs.client."; String DFS_NAMESERVICES = "dfs.nameservices"; + String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java new file mode 100644 index 0000000000..b887d87100 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ClientHAProxyFactory implements HAProxyFactory { + @Override + @SuppressWarnings("unchecked") + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( + nnAddr, conf, ugi, false, fallbackToSimpleAuth); + } + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries) + throws IOException { + return createProxy(conf, nnAddr, xface, ugi, withRetries, null); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java similarity index 76% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 0e8fa44880..e9c8791c5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -26,22 +26,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A FailoverProxyProvider implementation which allows one to configure @@ -51,25 +45,9 @@ */ public class ConfiguredFailoverProxyProvider extends AbstractNNFailoverProxyProvider { - - private static final Log LOG = - LogFactory.getLog(ConfiguredFailoverProxyProvider.class); - - interface ProxyFactory { - T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, - UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException; - } - static class DefaultProxyFactory implements ProxyFactory { - @Override - public T createProxy(Configuration conf, InetSocketAddress nnAddr, - Class xface, UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - return NameNodeProxies.createNonHAProxy(conf, - nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy(); - } - } + private static final Logger LOG = + LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class); protected final Configuration conf; protected final List> proxies = @@ -78,22 +56,11 @@ public T createProxy(Configuration conf, InetSocketAddress nnAddr, protected final Class xface; private int currentProxyIndex = 0; - private final ProxyFactory factory; + private final HAProxyFactory factory; public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, - Class xface) { - this(conf, uri, xface, new DefaultProxyFactory()); - } - - @VisibleForTesting - ConfiguredFailoverProxyProvider(Configuration conf, URI uri, - Class xface, ProxyFactory factory) { - - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface class %s is not a valid NameNode protocol!"); + Class xface, HAProxyFactory factory) { this.xface = xface; - this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, @@ -101,7 +68,7 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, maxRetries); - + int maxRetriesOnSocketTimeouts = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); @@ -112,16 +79,16 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, try { ugi = UserGroupInformation.getCurrentUser(); - - Map> map = DFSUtil.getHaNnRpcAddresses( - conf); + + Map> map = + DFSUtilClient.getHaNnRpcAddresses(conf); Map addressesInNN = map.get(uri.getHost()); - + if (addressesInNN == null || addressesInNN.size() == 0) { throw new RuntimeException("Could not find any configured addresses " + "for URI " + uri); } - + Collection addressesOfNns = addressesInNN.values(); for (InetSocketAddress address : addressesOfNns) { proxies.add(new AddressRpcProxyPair(address)); @@ -137,13 +104,13 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, // The client may have a delegation token set for the logical // URI of the cluster. Clone this token to apply to each of the // underlying IPC addresses so that the IPC code can find it. - HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); this.factory = factory; } catch (IOException e) { throw new RuntimeException(e); } } - + @Override public Class getInterface() { return xface; @@ -183,7 +150,7 @@ synchronized void incrementProxyIndex() { private static class AddressRpcProxyPair { public final InetSocketAddress address; public T namenode; - + public AddressRpcProxyPair(InetSocketAddress address) { this.address = address; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java new file mode 100644 index 0000000000..f92a74ff7c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This interface aims to decouple the proxy creation implementation that used + * in {@link AbstractNNFailoverProxyProvider}. Client side can use + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} to initialize the + * proxy while the server side can use NamenodeProtocols + */ +@InterfaceAudience.Private +public interface HAProxyFactory { + + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException; + + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries) throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java similarity index 87% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java index 4e1cb9e68d..ed250a0f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -25,14 +25,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; -import com.google.common.base.Preconditions; - /** * A NNFailoverProxyProvider implementation which works on IP failover setup. * Only one proxy is used to connect to both servers and switching between @@ -40,7 +36,7 @@ * clients can consistently reach only one node at a time. * * Clients with a live connection will likely get connection reset after an - * IP failover. This case will be handled by the + * IP failover. This case will be handled by the * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is * not idempotent, it won't get retried. * @@ -54,15 +50,14 @@ public class IPFailoverProxyProvider extends private final Configuration conf; private final Class xface; private final URI nameNodeUri; + private final HAProxyFactory factory; private ProxyInfo nnProxyInfo = null; - + public IPFailoverProxyProvider(Configuration conf, URI uri, - Class xface) { - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface class %s is not a valid NameNode protocol!"); + Class xface, HAProxyFactory factory) { this.xface = xface; this.nameNodeUri = uri; + this.factory = factory; this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( @@ -71,7 +66,7 @@ public IPFailoverProxyProvider(Configuration conf, URI uri, this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, maxRetries); - + int maxRetriesOnSocketTimeouts = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); @@ -79,7 +74,7 @@ public IPFailoverProxyProvider(Configuration conf, URI uri, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetriesOnSocketTimeouts); } - + @Override public Class getInterface() { return xface; @@ -92,9 +87,8 @@ public synchronized ProxyInfo getProxy() { try { // Create a proxy that is not wrapped in RetryProxy InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); - nnProxyInfo = new ProxyInfo(NameNodeProxies.createNonHAProxy( - conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), - false).getProxy(), nnAddr.toString()); + nnProxyInfo = new ProxyInfo(factory.createProxy(conf, nnAddr, xface, + UserGroupInformation.getCurrentUser(), false), nnAddr.toString()); } catch (IOException ioe) { throw new RuntimeException(ioe); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java index 2f6c9bc709..b94e94d67b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -34,7 +34,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.io.retry.MultiException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,15 +146,9 @@ public Object call() throws Exception { private volatile ProxyInfo successfulProxy = null; private volatile String toIgnore = null; - public RequestHedgingProxyProvider( - Configuration conf, URI uri, Class xface) { - this(conf, uri, xface, new DefaultProxyFactory()); - } - - @VisibleForTesting - RequestHedgingProxyProvider(Configuration conf, URI uri, - Class xface, ProxyFactory factory) { - super(conf, uri, xface, factory); + public RequestHedgingProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory proxyFactory) { + super(conf, uri, xface, proxyFactory); } @SuppressWarnings("unchecked") diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java similarity index 81% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java index 37532d5c88..724b5f01c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -29,9 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; @@ -66,20 +65,20 @@ public void setup() throws URISyntaxException { ns = "mycluster-" + Time.monotonicNow(); nnUri = new URI("hdfs://" + ns); conf = new Configuration(); - conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns); + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); conf.set( - DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); + HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); conf.set( - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", "machine1.foo.bar:9820"); conf.set( - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", "machine2.foo.bar:9820"); } @Test public void testHedgingWhenOneFails() throws Exception { - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -87,11 +86,11 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { return new long[]{1}; } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(badMock, goodMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -101,7 +100,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { @Test public void testHedgingWhenOneIsSlow() throws Exception { - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -109,11 +108,11 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { return new long[]{1}; } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(goodMock, badMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -124,14 +123,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { @Test public void testHedgingWhenBothFail() throws Exception { - NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); + ClientProtocol worseMock = Mockito.mock(ClientProtocol.class); Mockito.when(worseMock.getStats()).thenThrow( new IOException("Worse mock !!")); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(badMock, worseMock)); try { provider.getProxy().proxy.getStats(); @@ -147,7 +146,7 @@ public void testHedgingWhenBothFail() throws Exception { public void testPerformFailover() throws Exception { final AtomicInteger counter = new AtomicInteger(0); final int[] isGood = {1}; - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -159,7 +158,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { throw new IOException("Was Good mock !!"); } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -171,8 +170,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { throw new IOException("Bad mock !!"); } }); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(goodMock, badMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -234,14 +233,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { @Test public void testPerformFailoverWith3Proxies() throws Exception { - conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2,nn3"); - conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", + conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", "machine3.foo.bar:9820"); final AtomicInteger counter = new AtomicInteger(0); final int[] isGood = {1}; - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -253,7 +252,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { throw new IOException("Was Good mock !!"); } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -265,7 +264,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { throw new IOException("Bad mock !!"); } }); - final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol worseMock = Mockito.mock(ClientProtocol.class); Mockito.when(worseMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -278,8 +277,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { } }); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(goodMock, badMock, worseMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -355,14 +354,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { @Test public void testHedgingWhenFileNotFoundException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + ClientProtocol active = Mockito.mock(ClientProtocol.class); Mockito .when(active.getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong())) .thenThrow(new RemoteException("java.io.FileNotFoundException", "File does not exist!")); - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + ClientProtocol standby = Mockito.mock(ClientProtocol.class); Mockito .when(standby.getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong())) @@ -370,9 +369,9 @@ public void testHedgingWhenFileNotFoundException() throws Exception { new RemoteException("org.apache.hadoop.ipc.StandbyException", "Standby NameNode")); - RequestHedgingProxyProvider provider = + RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); + ClientProtocol.class, createFactory(active, standby)); try { provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); Assert.fail("Should fail since the active namenode throws" @@ -394,18 +393,18 @@ public void testHedgingWhenFileNotFoundException() throws Exception { @Test public void testHedgingWhenConnectException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + ClientProtocol active = Mockito.mock(ClientProtocol.class); Mockito.when(active.getStats()).thenThrow(new ConnectException()); - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + ClientProtocol standby = Mockito.mock(ClientProtocol.class); Mockito.when(standby.getStats()) .thenThrow( new RemoteException("org.apache.hadoop.ipc.StandbyException", "Standby NameNode")); - RequestHedgingProxyProvider provider = + RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); + ClientProtocol.class, createFactory(active, standby)); try { provider.getProxy().proxy.getStats(); Assert.fail("Should fail since the active namenode throws" @@ -428,15 +427,15 @@ public void testHedgingWhenConnectException() throws Exception { @Test public void testHedgingWhenConnectAndEOFException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + ClientProtocol active = Mockito.mock(ClientProtocol.class); Mockito.when(active.getStats()).thenThrow(new EOFException()); - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + ClientProtocol standby = Mockito.mock(ClientProtocol.class); Mockito.when(standby.getStats()).thenThrow(new ConnectException()); - RequestHedgingProxyProvider provider = + RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); + ClientProtocol.class, createFactory(active, standby)); try { provider.getProxy().proxy.getStats(); Assert.fail("Should fail since both active and standby namenodes throw" @@ -453,18 +452,25 @@ public void testHedgingWhenConnectAndEOFException() throws Exception { Mockito.verify(standby).getStats(); } - private ProxyFactory createFactory( - NamenodeProtocols... protos) { - final Iterator iterator = + private HAProxyFactory createFactory( + ClientProtocol... protos) { + final Iterator iterator = Lists.newArrayList(protos).iterator(); - return new ProxyFactory() { + return new HAProxyFactory() { @Override - public NamenodeProtocols createProxy(Configuration conf, - InetSocketAddress nnAddr, Class xface, + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException { return iterator.next(); } + + @Override + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries) throws IOException { + return iterator.next(); + } }; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 06b33f977d..6ff7e5aa9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -142,7 +142,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host"; - public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; + public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host"; public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address"; public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 23166e26f1..47e1c0db30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -449,19 +449,6 @@ public static Set getAllNnPrincipals(Configuration conf) throws IOExcept return principals; } - /** - * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from - * the configuration. - * - * @param conf configuration - * @return list of InetSocketAddresses - */ - public static Map> getHaNnRpcAddresses( - Configuration conf) { - return DFSUtilClient.getAddresses(conf, null, - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); - } - /** * Returns list of InetSocketAddress corresponding to backup node rpc * addresses from the configuration. @@ -693,7 +680,7 @@ public static String addressMapToString( public static String nnAddressesAsString(Configuration conf) { Map> addresses = - getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); return addressMapToString(addresses); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index ea535e9a5f..355608647c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -29,7 +29,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; -import static org.apache.hadoop.security.SecurityUtil.buildTokenService; import java.io.IOException; import java.net.InetSocketAddress; @@ -39,8 +38,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -48,17 +45,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -67,12 +59,6 @@ @InterfaceAudience.Private public class HAUtil { - private static final Log LOG = - LogFactory.getLog(HAUtil.class); - - private static final DelegationTokenSelector tokenSelector = - new DelegationTokenSelector(); - private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String[]{ DFS_NAMENODE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_BIND_HOST_KEY, @@ -97,7 +83,7 @@ private HAUtil() { /* Hidden constructor */ } */ public static boolean isHAEnabled(Configuration conf, String nsId) { Map> addresses = - DFSUtil.getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); if (addresses == null) return false; Map nnMap = addresses.get(nsId); return nnMap != null && nnMap.size() > 1; @@ -259,47 +245,6 @@ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) return provider.useLogicalURI(); } - /** - * Locate a delegation token associated with the given HA cluster URI, and if - * one is found, clone it to also represent the underlying namenode address. - * @param ugi the UGI to modify - * @param haUri the logical URI for the cluster - * @param nnAddrs collection of NNs in the cluster to which the token - * applies - */ - public static void cloneDelegationTokenForLogicalUri( - UserGroupInformation ugi, URI haUri, - Collection nnAddrs) { - // this cloning logic is only used by hdfs - Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, - HdfsConstants.HDFS_URI_SCHEME); - Token haToken = - tokenSelector.selectToken(haService, ugi.getTokens()); - if (haToken != null) { - for (InetSocketAddress singleNNAddr : nnAddrs) { - // this is a minor hack to prevent physical HA tokens from being - // exposed to the user via UGI.getCredentials(), otherwise these - // cloned tokens may be inadvertently propagated to jobs - Token specificToken = - haToken.privateClone(buildTokenService(singleNNAddr)); - Text alias = new Text( - HAUtilClient.buildTokenServicePrefixForLogicalUri( - HdfsConstants.HDFS_URI_SCHEME) - + "//" + specificToken.getService()); - ugi.addToken(alias, specificToken); - if (LOG.isDebugEnabled()) { - LOG.debug("Mapped HA service delegation token for logical URI " + - haUri + " to namenode " + singleNNAddr); - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No HA service delegation token found for logical URI " + - haUri); - } - } - } - /** * Get the internet address of the currently-active NN. This should rarely be * used, since callers of this method who connect directly to the NN using the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 61d701dfe8..d556c907c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.Text; @@ -112,7 +113,7 @@ public static ProxyAndInfo createProxy(Configuration conf, throws IOException { AbstractNNFailoverProxyProvider failoverProxyProvider = NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri, - xface, true, fallbackToSimpleAuth); + xface, true, fallbackToSimpleAuth, new NameNodeHAProxyFactory()); if (failoverProxyProvider == null) { return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java index 8edaed6903..6b489fc5bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java @@ -17,19 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.IOException; -import java.net.InetSocketAddress; - -import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; @@ -63,25 +57,6 @@ protected void writeXml(Exception except, String path, XMLOutputter doc) doc.endTag(); } - /** - * Create a {@link NameNode} proxy from the current {@link ServletContext}. - */ - protected ClientProtocol createNameNodeProxy() throws IOException { - ServletContext context = getServletContext(); - // if we are running in the Name Node, use it directly rather than via - // rpc - NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context); - if (nn != null) { - return nn.getRpcServer(); - } - InetSocketAddress nnAddr = - NameNodeHttpServer.getNameNodeAddressFromContext(context); - Configuration conf = new HdfsConfiguration( - NameNodeHttpServer.getConfFromContext(context)); - return NameNodeProxies.createProxy(conf, DFSUtilClient.getNNUri(nnAddr), - ClientProtocol.class).getProxy(); - } - protected UserGroupInformation getUGI(HttpServletRequest request, Configuration conf) throws IOException { return JspHelper.getUGI(getServletContext(), request, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java new file mode 100644 index 0000000000..036b6eb367 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +public class NameNodeHAProxyFactory implements HAProxyFactory { + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, + ugi, withRetries, fallbackToSimpleAuth).getProxy(); + } + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries) + throws IOException { + return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, + ugi, withRetries).getProxy(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index 7e8621b329..6265f44aa2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -42,10 +42,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -333,7 +333,7 @@ public static class DummyLegacyFailoverProxyProvider private Class xface; private T proxy; public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri, - Class xface) { + Class xface, HAProxyFactory proxyFactory) { try { this.proxy = NameNodeProxies.createNonHAProxy(conf, DFSUtilClient.getNNAddress(uri), xface, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index 7257bbd793..14ad6dd8d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -513,7 +513,7 @@ public void testHANameNodesWithFederation() throws URISyntaxException { NS2_NN2_HOST); Map> map = - DFSUtil.getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); assertTrue(HAUtil.isHAEnabled(conf, "ns1")); assertTrue(HAUtil.isHAEnabled(conf, "ns2")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 632bbf688e..ca44c79801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -292,7 +292,7 @@ public void testHAUtilClonesDelegationTokens() throws Exception { nn0.getNameNodeAddress().getPort())); nnAddrs.add(new InetSocketAddress("localhost", nn1.getNameNodeAddress().getPort())); - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); Collection> tokens = ugi.getTokens(); assertEquals(3, tokens.size()); @@ -321,7 +321,7 @@ public void testHAUtilClonesDelegationTokens() throws Exception { } // reclone the tokens, and see if they match now - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); for (InetSocketAddress addr : nnAddrs) { Text ipcDtService = SecurityUtil.buildTokenService(addr); Token token2 =