diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 0f42b77cb6..9b8b3beff9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -115,3 +115,5 @@ HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma HDFS-2795. Standby NN takes a long time to recover from a dead DN starting up. (todd) HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd) + +HDFS-2367. Enable the configuration of multiple HA cluster addresses. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 1dc2bf6758..ad2f8f67f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -125,15 +125,15 @@ public static void setAllowStandbyReads(Configuration conf, boolean val) { @SuppressWarnings("unchecked") public static FailoverProxyProvider createFailoverProxyProvider( Configuration conf, Class> failoverProxyProviderClass, - Class xface) throws IOException { + Class xface, URI nameNodeUri) throws IOException { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { Constructor> ctor = failoverProxyProviderClass - .getConstructor(Class.class); - FailoverProxyProvider provider = ctor.newInstance(xface); - ReflectionUtils.setConf(provider, conf); + .getConstructor(Configuration.class, URI.class, Class.class); + FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, + xface); return (FailoverProxyProvider) provider; } catch (Exception e) { if (e.getCause() instanceof IOException) { @@ -190,7 +190,8 @@ public static Object createFailoverProxy(Configuration conf, URI nameNodeUri, .getFailoverProxyProviderClass(conf, nameNodeUri, xface); if (failoverProxyProviderClass != null) { FailoverProxyProvider failoverProxyProvider = HAUtil - .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface); + .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, + nameNodeUri); Conf config = new Conf(conf); return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index c44c1c1d74..d2d0c00b55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -46,22 +47,59 @@ * and on a fail-over event the other address is tried. */ public class ConfiguredFailoverProxyProvider implements - FailoverProxyProvider, Configurable { + FailoverProxyProvider { private static final Log LOG = LogFactory.getLog(ConfiguredFailoverProxyProvider.class); - private Configuration conf; - private int currentProxyIndex = 0; - private List> proxies = new ArrayList>(); - private UserGroupInformation ugi; + private final Configuration conf; + private final List> proxies = + new ArrayList>(); + private final UserGroupInformation ugi; private final Class xface; + + private int currentProxyIndex = 0; - public ConfiguredFailoverProxyProvider(Class xface) { + public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class xface) { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface class %s is not a valid NameNode protocol!"); this.xface = xface; + + this.conf = new Configuration(conf); + int maxRetries = this.conf.getInt( + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY, + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + maxRetries); + + int maxRetriesOnSocketTimeouts = this.conf.getInt( + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + maxRetriesOnSocketTimeouts); + + try { + ugi = UserGroupInformation.getCurrentUser(); + + Map> map = DFSUtil.getHaNnRpcAddresses( + conf); + Map addressesInNN = map.get(uri.getHost()); + + if (addressesInNN == null || addressesInNN.size() == 0) { + throw new RuntimeException("Could not find any configured addresses " + + "for URI " + uri); + } + + for (InetSocketAddress address : addressesInNN.values()) { + proxies.add(new AddressRpcProxyPair(address)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override @@ -104,45 +142,6 @@ public synchronized void performFailover(T currentProxy) { currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); } - @Override - public synchronized Configuration getConf() { - return conf; - } - - @Override - public synchronized void setConf(Configuration conf) { - this.conf = new Configuration(conf); - int maxRetries = this.conf.getInt( - DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY, - DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - maxRetries); - - int maxRetriesOnSocketTimeouts = this.conf.getInt( - DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - maxRetriesOnSocketTimeouts); - try { - ugi = UserGroupInformation.getCurrentUser(); - - Map> map = DFSUtil.getHaNnRpcAddresses( - conf); - // TODO(HA): currently hardcoding the nameservice used by MiniDFSCluster. - // We need to somehow communicate this into the proxy provider. - String nsId = "nameserviceId1"; - Map addressesInNN = map.get(nsId); - - for (InetSocketAddress address : addressesInNN.values()) { - proxies.add(new AddressRpcProxyPair(address)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - /** * A little pair object to store the address and connected RPC proxy object to * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index d06a606e54..a88e8a74ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -18,28 +18,32 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; +import java.net.URI; import java.net.URISyntaxException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; public class TestDFSClientFailover { + private static final Log LOG = LogFactory.getLog(TestDFSClientFailover.class); + private static final Path TEST_FILE = new Path("/tmp/failover-test-file"); private static final int FILE_LENGTH_TO_VERIFY = 100; @@ -49,8 +53,9 @@ public class TestDFSClientFailover { @Before public void setUpCluster() throws IOException { cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) .build(); + cluster.transitionToActive(0); cluster.waitActive(); } @@ -58,34 +63,22 @@ public void setUpCluster() throws IOException { public void tearDownCluster() throws IOException { cluster.shutdown(); } - - // TODO(HA): This test should probably be made to fail if a client fails over - // to talk to an NN with a different block pool id. Once failover between - // active/standy in a single block pool is implemented, this test should be - // changed to exercise that. + + /** + * Make sure that client failover works when an active NN dies and the standby + * takes over. + */ @Test public void testDfsClientFailover() throws IOException, URISyntaxException { - InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress(); - InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress(); - - ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf); - ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf); - - DFSClient dfsClient1 = new DFSClient(null, nn1, conf, null); - DFSClient dfsClient2 = new DFSClient(null, nn2, conf, null); - - OutputStream out1 = dfsClient1.create(TEST_FILE.toString(), false); - OutputStream out2 = dfsClient2.create(TEST_FILE.toString(), false); - AppendTestUtil.write(out1, 0, FILE_LENGTH_TO_VERIFY); - AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY); - out1.close(); - out2.close(); - FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); - AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY); - cluster.getNameNode(0).stop(); - AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY); + DFSTestUtil.createFile(fs, TEST_FILE, + FILE_LENGTH_TO_VERIFY, (short)1, 1L); + + assertEquals(fs.getFileStatus(TEST_FILE).getLen(), FILE_LENGTH_TO_VERIFY); + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + assertEquals(fs.getFileStatus(TEST_FILE).getLen(), FILE_LENGTH_TO_VERIFY); // Check that it functions even if the URL becomes canonicalized // to include a port number. @@ -115,4 +108,28 @@ public void testLogicalUriShouldNotHavePorts() { "does not use port information", ioe); } } + + /** + * Make sure that a helpful error message is shown if a proxy provider is + * configured for a given URI, but no actual addresses are configured for that + * URI. + */ + @Test + public void testFailureWithMisconfiguredHaNNs() throws Exception { + String logicalHost = "misconfigured-ha-uri"; + Configuration conf = new Configuration(); + conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalHost, + ConfiguredFailoverProxyProvider.class.getName()); + + URI uri = new URI("hdfs://" + logicalHost + "/test"); + try { + FileSystem.get(uri, conf).exists(new Path("/test")); + fail("Successfully got proxy provider for misconfigured FS"); + } catch (IOException ioe) { + LOG.info("got expected exception", ioe); + assertTrue("expected exception did not contain helpful message", + StringUtils.stringifyException(ioe).contains( + "Could not find any configured addresses for URI " + uri)); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index cee846d762..5536ba37b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -130,7 +130,7 @@ public CouldNotCatchUpException(String message) { /** Gets the filesystem instance by setting the failover configurations */ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf) - throws IOException, URISyntaxException { + throws IOException, URISyntaxException { conf = new Configuration(conf); String logicalName = getLogicalHostname(cluster); setFailoverConfigurations(cluster, conf, logicalName); @@ -143,17 +143,17 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName) { InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress(); InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress(); - String nsId = "nameserviceId1"; String nameNodeId1 = "nn1"; String nameNodeId2 = "nn2"; String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort(); String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort(); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, - nsId, nameNodeId1), address1); + logicalName, nameNodeId1), address1); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, - nsId, nameNodeId2), address2); - conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId); - conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId), + logicalName, nameNodeId2), address2); + + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, logicalName); + conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, logicalName), nameNodeId1 + "," + nameNodeId2); conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, ConfiguredFailoverProxyProvider.class.getName());