HDFS-2367. Enable the configuration of multiple HA cluster addresses. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1233549 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e17cdde56
commit
02919e61f6
@ -115,3 +115,5 @@ HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma
|
||||
HDFS-2795. Standby NN takes a long time to recover from a dead DN starting up. (todd)
|
||||
|
||||
HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd)
|
||||
|
||||
HDFS-2367. Enable the configuration of multiple HA cluster addresses. (atm)
|
||||
|
@ -125,15 +125,15 @@ public static void setAllowStandbyReads(Configuration conf, boolean val) {
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
||||
Configuration conf, Class<FailoverProxyProvider<?>> failoverProxyProviderClass,
|
||||
Class xface) throws IOException {
|
||||
Class xface, URI nameNodeUri) throws IOException {
|
||||
Preconditions.checkArgument(
|
||||
xface.isAssignableFrom(NamenodeProtocols.class),
|
||||
"Interface %s is not a NameNode protocol", xface);
|
||||
try {
|
||||
Constructor<FailoverProxyProvider<?>> ctor = failoverProxyProviderClass
|
||||
.getConstructor(Class.class);
|
||||
FailoverProxyProvider<?> provider = ctor.newInstance(xface);
|
||||
ReflectionUtils.setConf(provider, conf);
|
||||
.getConstructor(Configuration.class, URI.class, Class.class);
|
||||
FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
|
||||
xface);
|
||||
return (FailoverProxyProvider<T>) provider;
|
||||
} catch (Exception e) {
|
||||
if (e.getCause() instanceof IOException) {
|
||||
@ -190,7 +190,8 @@ public static Object createFailoverProxy(Configuration conf, URI nameNodeUri,
|
||||
.getFailoverProxyProviderClass(conf, nameNodeUri, xface);
|
||||
if (failoverProxyProviderClass != null) {
|
||||
FailoverProxyProvider<?> failoverProxyProvider = HAUtil
|
||||
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface);
|
||||
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
|
||||
nameNodeUri);
|
||||
Conf config = new Conf(conf);
|
||||
return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
|
||||
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
|
@ -20,6 +20,7 @@
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -46,22 +47,59 @@
|
||||
* and on a fail-over event the other address is tried.
|
||||
*/
|
||||
public class ConfiguredFailoverProxyProvider<T> implements
|
||||
FailoverProxyProvider<T>, Configurable {
|
||||
FailoverProxyProvider<T> {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
|
||||
|
||||
private Configuration conf;
|
||||
private int currentProxyIndex = 0;
|
||||
private List<AddressRpcProxyPair<T>> proxies = new ArrayList<AddressRpcProxyPair<T>>();
|
||||
private UserGroupInformation ugi;
|
||||
private final Configuration conf;
|
||||
private final List<AddressRpcProxyPair<T>> proxies =
|
||||
new ArrayList<AddressRpcProxyPair<T>>();
|
||||
private final UserGroupInformation ugi;
|
||||
private final Class<T> xface;
|
||||
|
||||
public ConfiguredFailoverProxyProvider(Class<T> xface) {
|
||||
private int currentProxyIndex = 0;
|
||||
|
||||
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
|
||||
Class<T> xface) {
|
||||
Preconditions.checkArgument(
|
||||
xface.isAssignableFrom(NamenodeProtocols.class),
|
||||
"Interface class %s is not a valid NameNode protocol!");
|
||||
this.xface = xface;
|
||||
|
||||
this.conf = new Configuration(conf);
|
||||
int maxRetries = this.conf.getInt(
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
maxRetries);
|
||||
|
||||
int maxRetriesOnSocketTimeouts = this.conf.getInt(
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
maxRetriesOnSocketTimeouts);
|
||||
|
||||
try {
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
|
||||
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
|
||||
conf);
|
||||
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
|
||||
|
||||
if (addressesInNN == null || addressesInNN.size() == 0) {
|
||||
throw new RuntimeException("Could not find any configured addresses " +
|
||||
"for URI " + uri);
|
||||
}
|
||||
|
||||
for (InetSocketAddress address : addressesInNN.values()) {
|
||||
proxies.add(new AddressRpcProxyPair<T>(address));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -104,45 +142,6 @@ public synchronized void performFailover(T currentProxy) {
|
||||
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setConf(Configuration conf) {
|
||||
this.conf = new Configuration(conf);
|
||||
int maxRetries = this.conf.getInt(
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
maxRetries);
|
||||
|
||||
int maxRetriesOnSocketTimeouts = this.conf.getInt(
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
|
||||
this.conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||
maxRetriesOnSocketTimeouts);
|
||||
try {
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
|
||||
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
|
||||
conf);
|
||||
// TODO(HA): currently hardcoding the nameservice used by MiniDFSCluster.
|
||||
// We need to somehow communicate this into the proxy provider.
|
||||
String nsId = "nameserviceId1";
|
||||
Map<String, InetSocketAddress> addressesInNN = map.get(nsId);
|
||||
|
||||
for (InetSocketAddress address : addressesInNN.values()) {
|
||||
proxies.add(new AddressRpcProxyPair<T>(address));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A little pair object to store the address and connected RPC proxy object to
|
||||
* an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
|
||||
|
@ -18,28 +18,32 @@
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestDFSClientFailover {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestDFSClientFailover.class);
|
||||
|
||||
private static final Path TEST_FILE = new Path("/tmp/failover-test-file");
|
||||
private static final int FILE_LENGTH_TO_VERIFY = 100;
|
||||
|
||||
@ -49,8 +53,9 @@ public class TestDFSClientFailover {
|
||||
@Before
|
||||
public void setUpCluster() throws IOException {
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||
.build();
|
||||
cluster.transitionToActive(0);
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
@ -59,33 +64,21 @@ public void tearDownCluster() throws IOException {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
// TODO(HA): This test should probably be made to fail if a client fails over
|
||||
// to talk to an NN with a different block pool id. Once failover between
|
||||
// active/standy in a single block pool is implemented, this test should be
|
||||
// changed to exercise that.
|
||||
/**
|
||||
* Make sure that client failover works when an active NN dies and the standby
|
||||
* takes over.
|
||||
*/
|
||||
@Test
|
||||
public void testDfsClientFailover() throws IOException, URISyntaxException {
|
||||
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
|
||||
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
|
||||
|
||||
ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf);
|
||||
ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf);
|
||||
|
||||
DFSClient dfsClient1 = new DFSClient(null, nn1, conf, null);
|
||||
DFSClient dfsClient2 = new DFSClient(null, nn2, conf, null);
|
||||
|
||||
OutputStream out1 = dfsClient1.create(TEST_FILE.toString(), false);
|
||||
OutputStream out2 = dfsClient2.create(TEST_FILE.toString(), false);
|
||||
AppendTestUtil.write(out1, 0, FILE_LENGTH_TO_VERIFY);
|
||||
AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY);
|
||||
out1.close();
|
||||
out2.close();
|
||||
|
||||
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
|
||||
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
||||
cluster.getNameNode(0).stop();
|
||||
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
||||
DFSTestUtil.createFile(fs, TEST_FILE,
|
||||
FILE_LENGTH_TO_VERIFY, (short)1, 1L);
|
||||
|
||||
assertEquals(fs.getFileStatus(TEST_FILE).getLen(), FILE_LENGTH_TO_VERIFY);
|
||||
cluster.shutdownNameNode(0);
|
||||
cluster.transitionToActive(1);
|
||||
assertEquals(fs.getFileStatus(TEST_FILE).getLen(), FILE_LENGTH_TO_VERIFY);
|
||||
|
||||
// Check that it functions even if the URL becomes canonicalized
|
||||
// to include a port number.
|
||||
@ -115,4 +108,28 @@ public void testLogicalUriShouldNotHavePorts() {
|
||||
"does not use port information", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that a helpful error message is shown if a proxy provider is
|
||||
* configured for a given URI, but no actual addresses are configured for that
|
||||
* URI.
|
||||
*/
|
||||
@Test
|
||||
public void testFailureWithMisconfiguredHaNNs() throws Exception {
|
||||
String logicalHost = "misconfigured-ha-uri";
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalHost,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
|
||||
URI uri = new URI("hdfs://" + logicalHost + "/test");
|
||||
try {
|
||||
FileSystem.get(uri, conf).exists(new Path("/test"));
|
||||
fail("Successfully got proxy provider for misconfigured FS");
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("got expected exception", ioe);
|
||||
assertTrue("expected exception did not contain helpful message",
|
||||
StringUtils.stringifyException(ioe).contains(
|
||||
"Could not find any configured addresses for URI " + uri));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ public CouldNotCatchUpException(String message) {
|
||||
|
||||
/** Gets the filesystem instance by setting the failover configurations */
|
||||
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
|
||||
throws IOException, URISyntaxException {
|
||||
throws IOException, URISyntaxException {
|
||||
conf = new Configuration(conf);
|
||||
String logicalName = getLogicalHostname(cluster);
|
||||
setFailoverConfigurations(cluster, conf, logicalName);
|
||||
@ -143,17 +143,17 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
||||
Configuration conf, String logicalName) {
|
||||
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
|
||||
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
|
||||
String nsId = "nameserviceId1";
|
||||
String nameNodeId1 = "nn1";
|
||||
String nameNodeId2 = "nn2";
|
||||
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
||||
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
nsId, nameNodeId1), address1);
|
||||
logicalName, nameNodeId1), address1);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
nsId, nameNodeId2), address2);
|
||||
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId),
|
||||
logicalName, nameNodeId2), address2);
|
||||
|
||||
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, logicalName);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, logicalName),
|
||||
nameNodeId1 + "," + nameNodeId2);
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
|
Loading…
Reference in New Issue
Block a user