HDFS-2418. Change ConfiguredFailoverProxyProvider to take advantage of HDFS-2231. (atm)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1190078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ebb6cc60c4
commit
180646dea3
@ -15,3 +15,5 @@ HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the
|
||||
HDFS-2301. Start/stop appropriate namenode services when transition to active and standby states. (suresh)
|
||||
|
||||
HDFS-2231. Configuration changes for HA namenode. (suresh)
|
||||
|
||||
HDFS-2418. Change ConfiguredFailoverProxyProvider to take advantage of HDFS-2231. (atm)
|
||||
|
@ -406,18 +406,52 @@ private static List<InetSocketAddress> getAddresses(Configuration conf,
|
||||
}
|
||||
|
||||
// Get configuration suffixed with nameserviceId and/or namenodeId
|
||||
for (String nameserviceId : nameserviceIds) {
|
||||
for (String nnId : namenodeIds) {
|
||||
String keySuffix = concatSuffixes(nameserviceId, nnId);
|
||||
String address = getConfValue(null, keySuffix, conf, keys);
|
||||
if (address == null) {
|
||||
return null;
|
||||
if (federationEnabled && haEnabled) {
|
||||
for (String nameserviceId : nameserviceIds) {
|
||||
for (String nnId : namenodeIds) {
|
||||
String keySuffix = concatSuffixes(nameserviceId, nnId);
|
||||
String address = getConfValue(null, keySuffix, conf, keys);
|
||||
if (address != null) {
|
||||
isas.add(NetUtils.createSocketAddr(address));
|
||||
}
|
||||
}
|
||||
isas.add(NetUtils.createSocketAddr(address));
|
||||
}
|
||||
} else if (!federationEnabled && haEnabled) {
|
||||
for (String nnId : namenodeIds) {
|
||||
String address = getConfValue(null, nnId, conf, keys);
|
||||
if (address != null) {
|
||||
isas.add(NetUtils.createSocketAddr(address));
|
||||
}
|
||||
}
|
||||
} else if (federationEnabled && !haEnabled) {
|
||||
for (String nameserviceId : nameserviceIds) {
|
||||
String address = getConfValue(null, nameserviceId, conf, keys);
|
||||
if (address != null) {
|
||||
isas.add(NetUtils.createSocketAddr(address));
|
||||
}
|
||||
}
|
||||
}
|
||||
return isas;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
|
||||
* the configuration.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return list of InetSocketAddresses
|
||||
* @throws IOException if no addresses are configured
|
||||
*/
|
||||
public static List<InetSocketAddress> getHaNnRpcAddresses(
|
||||
Configuration conf) throws IOException {
|
||||
List<InetSocketAddress> addressList = getAddresses(conf, null,
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
if (addressList == null) {
|
||||
throw new IOException("Incorrect configuration: HA name node addresses "
|
||||
+ DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured.");
|
||||
}
|
||||
return addressList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of InetSocketAddress corresponding to backup node rpc
|
||||
|
@ -64,7 +64,7 @@ public static String getNameNodeId(Configuration conf) {
|
||||
DFSUtil.LOCAL_ADDRESS_MATCHER)[1];
|
||||
if (namenodeId == null) {
|
||||
String msg = "Configuration " + DFS_NAMENODE_RPC_ADDRESS_KEY +
|
||||
" must be suffixed with" + " namenodeId for HA configuration.";
|
||||
" must be suffixed with" + namenodeId + " for HA configuration.";
|
||||
throw new HadoopIllegalArgumentException(msg);
|
||||
}
|
||||
return namenodeId;
|
||||
|
@ -19,8 +19,6 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@ -31,7 +29,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -44,9 +41,6 @@
|
||||
public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
|
||||
Configurable {
|
||||
|
||||
public static final String CONFIGURED_NAMENODE_ADDRESSES
|
||||
= "dfs.ha.namenode.addresses";
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
|
||||
|
||||
@ -93,22 +87,13 @@ public synchronized void setConf(Configuration conf) {
|
||||
try {
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
|
||||
Collection<String> addresses = conf.getTrimmedStringCollection(
|
||||
CONFIGURED_NAMENODE_ADDRESSES);
|
||||
if (addresses == null || addresses.size() == 0) {
|
||||
throw new RuntimeException(this.getClass().getSimpleName() +
|
||||
" is configured but " + CONFIGURED_NAMENODE_ADDRESSES +
|
||||
" is not set.");
|
||||
}
|
||||
for (String address : addresses) {
|
||||
proxies.add(new AddressRpcProxyPair(
|
||||
NameNode.getAddress(new URI(address).getAuthority())));
|
||||
Collection<InetSocketAddress> addresses = DFSUtil.getHaNnRpcAddresses(
|
||||
conf);
|
||||
for (InetSocketAddress address : addresses) {
|
||||
proxies.add(new AddressRpcProxyPair(address));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException("Malformed URI set in " +
|
||||
CONFIGURED_NAMENODE_ADDRESSES, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,8 @@
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
@ -59,9 +61,16 @@ public void tearDownCluster() throws IOException {
|
||||
// changed to exercise that.
|
||||
@Test
|
||||
public void testDfsClientFailover() throws IOException, URISyntaxException {
|
||||
final String nameServiceId = "name-service-uri";
|
||||
final String logicalNameNodeId = "ha-nn-uri";
|
||||
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
|
||||
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
|
||||
String nameServiceId1 = DFSUtil.getNameServiceIdFromAddress(conf, nnAddr1,
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
String nameServiceId2 = DFSUtil.getNameServiceIdFromAddress(conf, nnAddr2,
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
|
||||
String nameNodeId1 = "nn1";
|
||||
String nameNodeId2 = "nn2";
|
||||
|
||||
ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf);
|
||||
ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf);
|
||||
@ -78,13 +87,16 @@ public void testDfsClientFailover() throws IOException, URISyntaxException {
|
||||
|
||||
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
||||
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
|
||||
conf.set(ConfiguredFailoverProxyProvider.CONFIGURED_NAMENODE_ADDRESSES,
|
||||
address1 + "," + address2);
|
||||
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + nameServiceId,
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
nameServiceId1, nameNodeId1), address1);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
nameServiceId2, nameNodeId2), address2);
|
||||
|
||||
conf.set(DFS_HA_NAMENODES_KEY, nameNodeId1 + "," + nameNodeId2);
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalNameNodeId,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
|
||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + nameServiceId), conf);
|
||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalNameNodeId), conf);
|
||||
|
||||
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
||||
cluster.getNameNode(0).stop();
|
||||
|
Loading…
Reference in New Issue
Block a user