HDFS-2231. Configuration changes for HA namenode. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1182626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f00198b16c
commit
7ca7832158
@ -13,3 +13,5 @@ HDFS-2407. getServerDefaults and getStats don't check operation category (atm)
|
|||||||
HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)
|
HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)
|
||||||
|
|
||||||
HDFS-2301. Start/stop appropriate namenode services when transition to active and standby states. (suresh)
|
HDFS-2301. Start/stop appropriate namenode services when transition to active and standby states. (suresh)
|
||||||
|
|
||||||
|
HDFS-2231. Configuration changes for HA namenode. (suresh)
|
||||||
|
@ -245,10 +245,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
|
public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
|
||||||
public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
|
public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
|
||||||
|
|
||||||
// HA related configuration
|
|
||||||
public static final String DFS_HA_NAMENODE_IDS_KEY = "dfs.ha.namenode.ids";
|
|
||||||
public static final String DFS_HA_NAMENODE_IDS_DEFAULT = "";
|
|
||||||
|
|
||||||
// property for fsimage compression
|
// property for fsimage compression
|
||||||
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
|
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
|
||||||
public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;
|
public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;
|
||||||
@ -279,11 +275,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
|
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
|
||||||
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
|
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
|
||||||
|
|
||||||
public static final String DFS_FEDERATION_NAMESERVICES = "dfs.federation.nameservices";
|
public static final String DFS_FEDERATION_NAMESERVICES = "dfs.federation.nameservices";
|
||||||
public static final String DFS_FEDERATION_NAMESERVICE_ID = "dfs.federation.nameservice.id";
|
public static final String DFS_FEDERATION_NAMESERVICE_ID = "dfs.federation.nameservice.id";
|
||||||
public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
|
public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
|
||||||
public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
|
public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
|
||||||
public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
|
public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
|
||||||
public static final long DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
|
public static final long DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
|
||||||
public static final String DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes";
|
public static final String DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes";
|
||||||
|
|
||||||
|
// HA related configuration
|
||||||
|
public static final String DFS_HA_NAMENODES_KEY = "dfs.ha.namenodes";
|
||||||
|
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";
|
||||||
}
|
}
|
||||||
|
@ -18,13 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
@ -90,13 +84,20 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
|
|||||||
a.isDecommissioned() ? 1 : -1;
|
a.isDecommissioned() ? 1 : -1;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
/**
|
||||||
|
* Address matcher for matching an address to local address
|
||||||
|
*/
|
||||||
|
static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() {
|
||||||
|
public boolean match(InetSocketAddress s) {
|
||||||
|
return NetUtils.isLocalAddress(s.getAddress());
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether the pathname is valid. Currently prohibits relative paths,
|
* Whether the pathname is valid. Currently prohibits relative paths,
|
||||||
* and names which contain a ":" or "/"
|
* and names which contain a ":" or "/"
|
||||||
*/
|
*/
|
||||||
public static boolean isValidName(String src) {
|
public static boolean isValidName(String src) {
|
||||||
|
|
||||||
// Path must be absolute.
|
// Path must be absolute.
|
||||||
if (!src.startsWith(Path.SEPARATOR)) {
|
if (!src.startsWith(Path.SEPARATOR)) {
|
||||||
return false;
|
return false;
|
||||||
@ -298,6 +299,18 @@ public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
|
|||||||
public static Collection<String> getNameServiceIds(Configuration conf) {
|
public static Collection<String> getNameServiceIds(Configuration conf) {
|
||||||
return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES);
|
return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Namenode HighAvailability related configuration.
|
||||||
|
* Returns collection of namenode Ids from the configuration. One logical id
|
||||||
|
* for each namenode in the in the HA setup.
|
||||||
|
*
|
||||||
|
* @param conf configuration
|
||||||
|
* @return collection of namenode Ids
|
||||||
|
*/
|
||||||
|
public static Collection<String> getNameNodeIds(Configuration conf) {
|
||||||
|
return conf.getStringCollection(DFS_HA_NAMENODES_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a list of keys in the order of preference, returns a value
|
* Given a list of keys in the order of preference, returns a value
|
||||||
@ -312,9 +325,7 @@ private static String getConfValue(String defaultValue, String keySuffix,
|
|||||||
Configuration conf, String... keys) {
|
Configuration conf, String... keys) {
|
||||||
String value = null;
|
String value = null;
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
if (keySuffix != null) {
|
key = addSuffix(key, keySuffix);
|
||||||
key += "." + keySuffix;
|
|
||||||
}
|
|
||||||
value = conf.get(key);
|
value = conf.get(key);
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
break;
|
break;
|
||||||
@ -326,6 +337,37 @@ private static String getConfValue(String defaultValue, String keySuffix,
|
|||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Add non empty and non null suffix to a key */
|
||||||
|
private static String addSuffix(String key, String suffix) {
|
||||||
|
if (suffix == null || suffix.length() == 0) {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
if (!suffix.startsWith(".")) {
|
||||||
|
key += ".";
|
||||||
|
}
|
||||||
|
return key += suffix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Concatenate list of suffix strings '.' separated */
|
||||||
|
private static String concatSuffixes(String... suffixes) {
|
||||||
|
if (suffixes == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String ret = "";
|
||||||
|
for (int i = 0; i < suffixes.length - 1; i++) {
|
||||||
|
ret = addSuffix(ret, suffixes[i]);
|
||||||
|
}
|
||||||
|
return addSuffix(ret, suffixes[suffixes.length - 1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return configuration key of format key.suffix1.suffix2...suffixN
|
||||||
|
*/
|
||||||
|
public static String addKeySuffixes(String key, String... suffixes) {
|
||||||
|
String keySuffix = concatSuffixes(suffixes);
|
||||||
|
return addSuffix(key, keySuffix);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns list of InetSocketAddress for a given set of keys.
|
* Returns list of InetSocketAddress for a given set of keys.
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
@ -336,19 +378,38 @@ private static String getConfValue(String defaultValue, String keySuffix,
|
|||||||
private static List<InetSocketAddress> getAddresses(Configuration conf,
|
private static List<InetSocketAddress> getAddresses(Configuration conf,
|
||||||
String defaultAddress, String... keys) {
|
String defaultAddress, String... keys) {
|
||||||
Collection<String> nameserviceIds = getNameServiceIds(conf);
|
Collection<String> nameserviceIds = getNameServiceIds(conf);
|
||||||
|
Collection<String> namenodeIds = getNameNodeIds(conf);
|
||||||
List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
|
List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
|
||||||
|
|
||||||
// Configuration with a single namenode
|
final boolean federationEnabled = nameserviceIds != null
|
||||||
if (nameserviceIds == null || nameserviceIds.isEmpty()) {
|
&& !nameserviceIds.isEmpty();
|
||||||
|
final boolean haEnabled = namenodeIds != null
|
||||||
|
&& !namenodeIds.isEmpty();
|
||||||
|
|
||||||
|
// Configuration with no federation and ha, return default address
|
||||||
|
if (!federationEnabled && !haEnabled) {
|
||||||
String address = getConfValue(defaultAddress, null, conf, keys);
|
String address = getConfValue(defaultAddress, null, conf, keys);
|
||||||
if (address == null) {
|
if (address == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
isas.add(NetUtils.createSocketAddr(address));
|
isas.add(NetUtils.createSocketAddr(address));
|
||||||
} else {
|
return isas;
|
||||||
// Get the namenodes for all the configured nameServiceIds
|
}
|
||||||
for (String nameserviceId : nameserviceIds) {
|
|
||||||
String address = getConfValue(null, nameserviceId, conf, keys);
|
if (!federationEnabled) {
|
||||||
|
nameserviceIds = new ArrayList<String>();
|
||||||
|
nameserviceIds.add(null);
|
||||||
|
}
|
||||||
|
if (!haEnabled) {
|
||||||
|
namenodeIds = new ArrayList<String>();
|
||||||
|
namenodeIds.add(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get configuration suffixed with nameserviceId and/or namenodeId
|
||||||
|
for (String nameserviceId : nameserviceIds) {
|
||||||
|
for (String nnId : namenodeIds) {
|
||||||
|
String keySuffix = concatSuffixes(nameserviceId, nnId);
|
||||||
|
String address = getConfValue(null, keySuffix, conf, keys);
|
||||||
if (address == null) {
|
if (address == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -431,12 +492,12 @@ public static List<InetSocketAddress> getNNServiceRpcAddresses(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the InetSocketAddress for any configured communication with a
|
* Given the InetSocketAddress this method returns the nameservice Id
|
||||||
* namenode, this method returns the corresponding nameservice ID,
|
* corresponding to the key with matching address, by doing a reverse
|
||||||
* by doing a reverse lookup on the list of nameservices until it
|
* lookup on the list of nameservices until it finds a match.
|
||||||
* finds a match.
|
*
|
||||||
* If null is returned, client should try {@link #isDefaultNamenodeAddress}
|
* If null is returned, client should try {@link #isDefaultNamenodeAddress}
|
||||||
* to check pre-Federated configurations.
|
* to check pre-Federation, non-HA configurations.
|
||||||
* Since the process of resolving URIs to Addresses is slightly expensive,
|
* Since the process of resolving URIs to Addresses is slightly expensive,
|
||||||
* this utility method should not be used in performance-critical routines.
|
* this utility method should not be used in performance-critical routines.
|
||||||
*
|
*
|
||||||
@ -453,58 +514,43 @@ public static List<InetSocketAddress> getNNServiceRpcAddresses(
|
|||||||
* not the NameServiceId-suffixed keys.
|
* not the NameServiceId-suffixed keys.
|
||||||
* @return nameserviceId, or null if no match found
|
* @return nameserviceId, or null if no match found
|
||||||
*/
|
*/
|
||||||
public static String getNameServiceIdFromAddress(Configuration conf,
|
public static String getNameServiceIdFromAddress(final Configuration conf,
|
||||||
InetSocketAddress address, String... keys) {
|
final InetSocketAddress address, String... keys) {
|
||||||
Collection<String> nameserviceIds = getNameServiceIds(conf);
|
|
||||||
|
|
||||||
// Configuration with a single namenode and no nameserviceId
|
// Configuration with a single namenode and no nameserviceId
|
||||||
if (nameserviceIds == null || nameserviceIds.isEmpty()) {
|
if (!isFederationEnabled(conf)) {
|
||||||
// client should try {@link isDefaultNamenodeAddress} instead
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
// Get the candidateAddresses for all the configured nameServiceIds
|
String[] ids = getSuffixIDs(conf, address, keys);
|
||||||
for (String nameserviceId : nameserviceIds) {
|
return (ids != null && ids.length > 0) ? ids[0] : null;
|
||||||
for (String key : keys) {
|
|
||||||
String candidateAddress = conf.get(
|
|
||||||
getNameServiceIdKey(key, nameserviceId));
|
|
||||||
if (candidateAddress != null
|
|
||||||
&& address.equals(NetUtils.createSocketAddr(candidateAddress)))
|
|
||||||
return nameserviceId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// didn't find a match
|
|
||||||
// client should try {@link isDefaultNamenodeAddress} instead
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return server http or https address from the configuration
|
* return server http or https address from the configuration for a
|
||||||
|
* given namenode rpc address.
|
||||||
* @param conf
|
* @param conf
|
||||||
* @param namenode - namenode address
|
* @param namenodeAddr - namenode RPC address
|
||||||
* @param httpsAddress -If true, and if security is enabled, returns server
|
* @param httpsAddress -If true, and if security is enabled, returns server
|
||||||
* https address. If false, returns server http address.
|
* https address. If false, returns server http address.
|
||||||
* @return server http or https address
|
* @return server http or https address
|
||||||
*/
|
*/
|
||||||
public static String getInfoServer(
|
public static String getInfoServer(
|
||||||
InetSocketAddress namenode, Configuration conf, boolean httpsAddress) {
|
InetSocketAddress namenodeAddr, Configuration conf, boolean httpsAddress) {
|
||||||
String httpAddress = null;
|
String httpAddress = null;
|
||||||
|
boolean securityOn = UserGroupInformation.isSecurityEnabled();
|
||||||
String httpAddressKey = (UserGroupInformation.isSecurityEnabled()
|
String httpAddressKey = (securityOn && httpsAddress) ?
|
||||||
&& httpsAddress) ? DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY
|
DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||||
: DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
String httpAddressDefault = (securityOn && httpsAddress) ?
|
||||||
String httpAddressDefault = (UserGroupInformation.isSecurityEnabled()
|
DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT : DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
|
||||||
&& httpsAddress) ? DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT
|
if (namenodeAddr != null) {
|
||||||
: DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
|
|
||||||
if(namenode != null) {
|
|
||||||
// if non-default namenode, try reverse look up
|
// if non-default namenode, try reverse look up
|
||||||
// the nameServiceID if it is available
|
// the nameServiceID if it is available
|
||||||
String nameServiceId = DFSUtil.getNameServiceIdFromAddress(
|
String nameServiceId = DFSUtil.getNameServiceIdFromAddress(
|
||||||
conf, namenode,
|
conf, namenodeAddr,
|
||||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
|
|
||||||
if (nameServiceId != null) {
|
if (nameServiceId != null) {
|
||||||
httpAddress = conf.get(DFSUtil.getNameServiceIdKey(
|
httpAddress = conf.get(DFSUtil.addKeySuffixes(
|
||||||
httpAddressKey, nameServiceId));
|
httpAddressKey, nameServiceId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -512,7 +558,6 @@ public static String getInfoServer(
|
|||||||
if (httpAddress == null) {
|
if (httpAddress == null) {
|
||||||
httpAddress = conf.get(httpAddressKey, httpAddressDefault);
|
httpAddress = conf.get(httpAddressKey, httpAddressDefault);
|
||||||
}
|
}
|
||||||
|
|
||||||
return httpAddress;
|
return httpAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,30 +593,27 @@ public static boolean isDefaultNamenodeAddress(Configuration conf,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return key specific to a nameserviceId from a generic key
|
|
||||||
*/
|
|
||||||
public static String getNameServiceIdKey(String key, String nameserviceId) {
|
|
||||||
return key + "." + nameserviceId;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the node specific setting into generic configuration key. Looks up
|
* Sets the node specific setting into generic configuration key. Looks up
|
||||||
* value of "key.nameserviceId" and if found sets that value into generic key
|
* value of "key.nameserviceId.namenodeId" and if found sets that value into
|
||||||
* in the conf. Note that this only modifies the runtime conf.
|
* generic key in the conf. Note that this only modifies the runtime conf.
|
||||||
*
|
*
|
||||||
* @param conf
|
* @param conf
|
||||||
* Configuration object to lookup specific key and to set the value
|
* Configuration object to lookup specific key and to set the value
|
||||||
* to the key passed. Note the conf object is modified.
|
* to the key passed. Note the conf object is modified.
|
||||||
* @param nameserviceId
|
* @param nameserviceId
|
||||||
* nameservice Id to construct the node specific key.
|
* nameservice Id to construct the node specific key. Pass null if
|
||||||
|
* federation is not configuration.
|
||||||
|
* @param nnId
|
||||||
|
* namenode Id to construct the node specific key. Pass null if
|
||||||
|
* HA is not configured.
|
||||||
* @param keys
|
* @param keys
|
||||||
* The key for which node specific value is looked up
|
* The key for which node specific value is looked up
|
||||||
*/
|
*/
|
||||||
public static void setGenericConf(Configuration conf,
|
public static void setGenericConf(Configuration conf,
|
||||||
String nameserviceId, String... keys) {
|
String nameserviceId, String nnId, String... keys) {
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
String value = conf.get(getNameServiceIdKey(key, nameserviceId));
|
String value = conf.get(addKeySuffixes(key, nameserviceId, nnId));
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
conf.set(key, value);
|
conf.set(key, value);
|
||||||
}
|
}
|
||||||
@ -580,12 +622,12 @@ public static void setGenericConf(Configuration conf,
|
|||||||
|
|
||||||
/** Return used as percentage of capacity */
|
/** Return used as percentage of capacity */
|
||||||
public static float getPercentUsed(long used, long capacity) {
|
public static float getPercentUsed(long used, long capacity) {
|
||||||
return capacity <= 0 ? 100 : ((float)used * 100.0f)/(float)capacity;
|
return capacity <= 0 ? 100 : (used * 100.0f)/capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return remaining as percentage of capacity */
|
/** Return remaining as percentage of capacity */
|
||||||
public static float getPercentRemaining(long remaining, long capacity) {
|
public static float getPercentRemaining(long remaining, long capacity) {
|
||||||
return capacity <= 0 ? 0 : ((float)remaining * 100.0f)/(float)capacity;
|
return capacity <= 0 ? 0 : (remaining * 100.0f)/capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -687,23 +729,21 @@ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|||||||
UserGroupInformation ticket = UserGroupInformation
|
UserGroupInformation ticket = UserGroupInformation
|
||||||
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
|
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
|
||||||
ticket.addToken(locatedBlock.getBlockToken());
|
ticket.addToken(locatedBlock.getBlockToken());
|
||||||
return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
|
return RPC.getProxy(ClientDatanodeProtocol.class,
|
||||||
ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
|
ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
|
||||||
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if HA for namenode is configured.
|
* Returns true if federation configuration is enabled
|
||||||
* @param conf Configuration
|
|
||||||
* @return true if HA is configured in the configuration; else false.
|
|
||||||
*/
|
*/
|
||||||
public static boolean isHAEnabled(Configuration conf) {
|
public static boolean isFederationEnabled(Configuration conf) {
|
||||||
// TODO:HA configuration changes pending
|
Collection<String> collection = getNameServiceIds(conf);
|
||||||
return false;
|
return collection != null && collection.size() != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get name service Id for the {@link NameNode} based on namenode RPC address
|
* Get nameservice Id for the {@link NameNode} based on namenode RPC address
|
||||||
* matching the local node address.
|
* matching the local node address.
|
||||||
*/
|
*/
|
||||||
public static String getNamenodeNameServiceId(Configuration conf) {
|
public static String getNamenodeNameServiceId(Configuration conf) {
|
||||||
@ -711,7 +751,7 @@ public static String getNamenodeNameServiceId(Configuration conf) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get name service Id for the BackupNode based on backup node RPC address
|
* Get nameservice Id for the BackupNode based on backup node RPC address
|
||||||
* matching the local node address.
|
* matching the local node address.
|
||||||
*/
|
*/
|
||||||
public static String getBackupNameServiceId(Configuration conf) {
|
public static String getBackupNameServiceId(Configuration conf) {
|
||||||
@ -719,7 +759,7 @@ public static String getBackupNameServiceId(Configuration conf) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get name service Id for the secondary node based on secondary http address
|
* Get nameservice Id for the secondary node based on secondary http address
|
||||||
* matching the local node address.
|
* matching the local node address.
|
||||||
*/
|
*/
|
||||||
public static String getSecondaryNameServiceId(Configuration conf) {
|
public static String getSecondaryNameServiceId(Configuration conf) {
|
||||||
@ -732,12 +772,12 @@ public static String getSecondaryNameServiceId(Configuration conf) {
|
|||||||
*
|
*
|
||||||
* If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically
|
* If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically
|
||||||
* configured, this method determines the nameservice Id by matching the local
|
* configured, this method determines the nameservice Id by matching the local
|
||||||
* nodes address with the configured addresses. When a match is found, it
|
* node's address with the configured addresses. When a match is found, it
|
||||||
* returns the nameservice Id from the corresponding configuration key.
|
* returns the nameservice Id from the corresponding configuration key.
|
||||||
*
|
*
|
||||||
* @param conf Configuration
|
* @param conf Configuration
|
||||||
* @param addressKey configuration key to get the address.
|
* @param addressKey configuration key to get the address.
|
||||||
* @return name service Id on success, null on failure.
|
* @return nameservice Id on success, null if federation is not configured.
|
||||||
* @throws HadoopIllegalArgumentException on error
|
* @throws HadoopIllegalArgumentException on error
|
||||||
*/
|
*/
|
||||||
private static String getNameServiceId(Configuration conf, String addressKey) {
|
private static String getNameServiceId(Configuration conf, String addressKey) {
|
||||||
@ -745,33 +785,104 @@ private static String getNameServiceId(Configuration conf, String addressKey) {
|
|||||||
if (nameserviceId != null) {
|
if (nameserviceId != null) {
|
||||||
return nameserviceId;
|
return nameserviceId;
|
||||||
}
|
}
|
||||||
|
if (!isFederationEnabled(conf)) {
|
||||||
Collection<String> ids = getNameServiceIds(conf);
|
|
||||||
if (ids == null || ids.size() == 0) {
|
|
||||||
// Not federation configuration, hence no nameservice Id
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
nameserviceId = getSuffixIDs(conf, addressKey, LOCAL_ADDRESS_MATCHER)[0];
|
||||||
// Match the rpc address with that of local address
|
if (nameserviceId == null) {
|
||||||
int found = 0;
|
String msg = "Configuration " + addressKey + " must be suffixed with" +
|
||||||
for (String id : ids) {
|
" nameserviceId for federation configuration.";
|
||||||
String addr = conf.get(getNameServiceIdKey(addressKey, id));
|
throw new HadoopIllegalArgumentException(msg);
|
||||||
InetSocketAddress s = NetUtils.createSocketAddr(addr);
|
|
||||||
if (NetUtils.isLocalAddress(s.getAddress())) {
|
|
||||||
nameserviceId = id;
|
|
||||||
found++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (found > 1) { // Only one address must match the local address
|
|
||||||
throw new HadoopIllegalArgumentException(
|
|
||||||
"Configuration has multiple RPC addresses that matches "
|
|
||||||
+ "the local node's address. Please configure the system with "
|
|
||||||
+ "the parameter " + DFS_FEDERATION_NAMESERVICE_ID);
|
|
||||||
}
|
|
||||||
if (found == 0) {
|
|
||||||
throw new HadoopIllegalArgumentException("Configuration address "
|
|
||||||
+ addressKey + " is missing in configuration with name service Id");
|
|
||||||
}
|
}
|
||||||
return nameserviceId;
|
return nameserviceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns nameservice Id and namenode Id when the local host matches the
|
||||||
|
* configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @param addressKey configuration key corresponding to the address.
|
||||||
|
* @param matcher matching criteria for matching the address
|
||||||
|
* @return Array with nameservice Id and namenode Id on success. First element
|
||||||
|
* in the array is nameservice Id and second element is namenode Id.
|
||||||
|
* Null value indicates that the configuration does not have the the
|
||||||
|
* Id.
|
||||||
|
* @throws HadoopIllegalArgumentException on error
|
||||||
|
*/
|
||||||
|
static String[] getSuffixIDs(final Configuration conf, final String addressKey,
|
||||||
|
final AddressMatcher matcher) {
|
||||||
|
Collection<String> nsIds = getNameServiceIds(conf);
|
||||||
|
boolean federationEnabled = true;
|
||||||
|
if (nsIds == null || nsIds.size() == 0) {
|
||||||
|
federationEnabled = false; // federation not configured
|
||||||
|
nsIds = new ArrayList<String>();
|
||||||
|
nsIds.add(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean haEnabled = true;
|
||||||
|
Collection<String> nnIds = getNameNodeIds(conf);
|
||||||
|
if (nnIds == null || nnIds.size() == 0) {
|
||||||
|
haEnabled = false; // HA not configured
|
||||||
|
nnIds = new ArrayList<String>();
|
||||||
|
nnIds.add(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match the address from addressKey.nsId.nnId based on the given matcher
|
||||||
|
String nameserviceId = null;
|
||||||
|
String namenodeId = null;
|
||||||
|
int found = 0;
|
||||||
|
for (String nsId : nsIds) {
|
||||||
|
for (String nnId : nnIds) {
|
||||||
|
String key = addKeySuffixes(addressKey, nsId, nnId);
|
||||||
|
String addr = conf.get(key);
|
||||||
|
InetSocketAddress s = null;
|
||||||
|
try {
|
||||||
|
s = NetUtils.createSocketAddr(addr);
|
||||||
|
} catch (Exception e) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (matcher.match(s)) {
|
||||||
|
nameserviceId = nsId;
|
||||||
|
namenodeId = nnId;
|
||||||
|
found++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (found > 1) { // Only one address must match the local address
|
||||||
|
String msg = "Configuration has multiple addresses that match "
|
||||||
|
+ "local node's address. Please configure the system with "
|
||||||
|
+ (federationEnabled ? DFS_FEDERATION_NAMESERVICE_ID : "")
|
||||||
|
+ (haEnabled ? (" and " + DFS_HA_NAMENODE_ID_KEY) : "");
|
||||||
|
throw new HadoopIllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
return new String[] { nameserviceId, namenodeId };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For given set of {@code keys} adds nameservice Id and or namenode Id
|
||||||
|
* and returns {nameserviceId, namenodeId} when address match is found.
|
||||||
|
* @see #getSuffixIDs(Configuration, String, AddressMatcher)
|
||||||
|
*/
|
||||||
|
static String[] getSuffixIDs(final Configuration conf,
|
||||||
|
final InetSocketAddress address, final String... keys) {
|
||||||
|
AddressMatcher matcher = new AddressMatcher() {
|
||||||
|
@Override
|
||||||
|
public boolean match(InetSocketAddress s) {
|
||||||
|
return address.equals(s);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for (String key : keys) {
|
||||||
|
String[] ids = getSuffixIDs(conf, key, matcher);
|
||||||
|
if (ids != null && (ids [0] != null || ids[1] != null)) {
|
||||||
|
return ids;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface AddressMatcher {
|
||||||
|
public boolean match(InetSocketAddress s);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,91 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
public class HAUtil {
|
||||||
|
private HAUtil() { /* Hidden constructor */ }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if HA for namenode is configured.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return true if HA is configured in the configuration; else false.
|
||||||
|
*/
|
||||||
|
public static boolean isHAEnabled(Configuration conf) {
|
||||||
|
Collection<String> collection = DFSUtil.getNameNodeIds(conf);
|
||||||
|
return collection != null && !collection.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the namenode Id by matching the {@code addressKey}
|
||||||
|
* with the the address of the local node.
|
||||||
|
*
|
||||||
|
* If {@link DFSConfigKeys#DFS_HA_NAMENODE_ID_KEY} is not specifically
|
||||||
|
* configured, this method determines the namenode Id by matching the local
|
||||||
|
* node's address with the configured addresses. When a match is found, it
|
||||||
|
* returns the namenode Id from the corresponding configuration key.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return namenode Id on success, null on failure.
|
||||||
|
* @throws HadoopIllegalArgumentException on error
|
||||||
|
*/
|
||||||
|
public static String getNameNodeId(Configuration conf) {
|
||||||
|
String namenodeId = conf.get(DFS_HA_NAMENODE_ID_KEY);
|
||||||
|
if (namenodeId != null) {
|
||||||
|
return namenodeId;
|
||||||
|
}
|
||||||
|
if (!isHAEnabled(conf)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
namenodeId = DFSUtil.getSuffixIDs(conf, DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
DFSUtil.LOCAL_ADDRESS_MATCHER)[1];
|
||||||
|
if (namenodeId == null) {
|
||||||
|
String msg = "Configuration " + DFS_NAMENODE_RPC_ADDRESS_KEY +
|
||||||
|
" must be suffixed with" + " namenodeId for HA configuration.";
|
||||||
|
throw new HadoopIllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
return namenodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to
|
||||||
|
* {@link DFSUtil#getNameServiceIdFromAddress(Configuration,
|
||||||
|
* InetSocketAddress, String...)}
|
||||||
|
*/
|
||||||
|
public static String getNameNodeIdFromAddress(final Configuration conf,
|
||||||
|
final InetSocketAddress address, String... keys) {
|
||||||
|
// Configuration with a single namenode and no nameserviceId
|
||||||
|
if (!isHAEnabled(conf)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] ids = DFSUtil.getSuffixIDs(conf, address, keys);
|
||||||
|
if (ids != null && ids.length > 1) {
|
||||||
|
return ids[1];
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.fs.Trash;
|
import org.apache.hadoop.fs.Trash;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
@ -526,7 +527,7 @@ protected NameNode(Configuration conf, NamenodeRole role)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.role = role;
|
this.role = role;
|
||||||
this.haEnabled = DFSUtil.isHAEnabled(conf);
|
this.haEnabled = HAUtil.isHAEnabled(conf);
|
||||||
this.haContext = new NameNodeHAContext();
|
this.haContext = new NameNodeHAContext();
|
||||||
try {
|
try {
|
||||||
initializeGenericKeys(conf, getNameServiceId(conf));
|
initializeGenericKeys(conf, getNameServiceId(conf));
|
||||||
@ -841,15 +842,18 @@ public static NameNode createNameNode(String argv[], Configuration conf)
|
|||||||
* Configuration object to lookup specific key and to set the value
|
* Configuration object to lookup specific key and to set the value
|
||||||
* to the key passed. Note the conf object is modified
|
* to the key passed. Note the conf object is modified
|
||||||
* @param nameserviceId name service Id
|
* @param nameserviceId name service Id
|
||||||
* @see DFSUtil#setGenericConf(Configuration, String, String...)
|
* @see DFSUtil#setGenericConf(Configuration, String, String, String...)
|
||||||
*/
|
*/
|
||||||
public static void initializeGenericKeys(Configuration conf, String
|
public static void initializeGenericKeys(Configuration conf, String
|
||||||
nameserviceId) {
|
nameserviceId) {
|
||||||
if ((nameserviceId == null) || nameserviceId.isEmpty()) {
|
String namenodeId = HAUtil.getNameNodeId(conf);
|
||||||
|
if ((nameserviceId == null || nameserviceId.isEmpty()) &&
|
||||||
|
(namenodeId == null || namenodeId.isEmpty())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSUtil.setGenericConf(conf, nameserviceId, NAMESERVICE_SPECIFIC_KEYS);
|
DFSUtil.setGenericConf(conf, nameserviceId, namenodeId,
|
||||||
|
NAMESERVICE_SPECIFIC_KEYS);
|
||||||
if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
|
if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
|
||||||
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
|
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
|
||||||
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
|
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
|
||||||
|
@ -587,11 +587,11 @@ private static void initFederationConf(Configuration conf,
|
|||||||
private static void initFederatedNamenodeAddress(Configuration conf,
|
private static void initFederatedNamenodeAddress(Configuration conf,
|
||||||
String nameserviceId, int nnPort) {
|
String nameserviceId, int nnPort) {
|
||||||
// Set nameserviceId specific key
|
// Set nameserviceId specific key
|
||||||
String key = DFSUtil.getNameServiceIdKey(
|
String key = DFSUtil.addKeySuffixes(
|
||||||
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId);
|
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId);
|
||||||
conf.set(key, "127.0.0.1:0");
|
conf.set(key, "127.0.0.1:0");
|
||||||
|
|
||||||
key = DFSUtil.getNameServiceIdKey(
|
key = DFSUtil.addKeySuffixes(
|
||||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId);
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId);
|
||||||
conf.set(key, "127.0.0.1:" + nnPort);
|
conf.set(key, "127.0.0.1:" + nnPort);
|
||||||
}
|
}
|
||||||
@ -644,10 +644,10 @@ private void createFederatedNameNode(int nnIndex, Configuration conf,
|
|||||||
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
|
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
|
||||||
NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs,
|
NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs,
|
||||||
format, operation, clusterId);
|
format, operation, clusterId);
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode
|
||||||
.getHostPortString(nn.getNameNodeAddress()));
|
.getHostPortString(nn.getNameNodeAddress()));
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId), NameNode
|
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId), NameNode
|
||||||
.getHostPortString(nn.getHttpAddress()));
|
.getHostPortString(nn.getHttpAddress()));
|
||||||
DFSUtil.setGenericConf(conf, nameserviceId,
|
DFSUtil.setGenericConf(conf, nameserviceId,
|
||||||
|
@ -86,7 +86,7 @@ public void testLocatedBlocks2Locations() {
|
|||||||
private Configuration setupAddress(String key) {
|
private Configuration setupAddress(String key) {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICES, "nn1");
|
conf.set(DFS_FEDERATION_NAMESERVICES, "nn1");
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(key, "nn1"), "localhost:9000");
|
conf.set(DFSUtil.addKeySuffixes(key, "nn1"), "localhost:9000");
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ public void getNameServiceId() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test {@link DFSUtil#getNameNodeNameServiceId(Configuration)} to ensure
|
* Test {@link DFSUtil#getNamenodeNameServiceId(Configuration)} to ensure
|
||||||
* nameserviceId for namenode is determined based on matching the address with
|
* nameserviceId for namenode is determined based on matching the address with
|
||||||
* local node's address
|
* local node's address
|
||||||
*/
|
*/
|
||||||
@ -135,7 +135,7 @@ public void getSecondaryNameServiceId() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test {@link DFSUtil#getNameServiceId(Configuration, String))} to ensure
|
* Test {@link DFSUtil#getNamenodeNameServiceId(Configuration)} to ensure
|
||||||
* exception is thrown when multiple rpc addresses match the local node's
|
* exception is thrown when multiple rpc addresses match the local node's
|
||||||
* address
|
* address
|
||||||
*/
|
*/
|
||||||
@ -143,9 +143,9 @@ public void getSecondaryNameServiceId() {
|
|||||||
public void testGetNameServiceIdException() {
|
public void testGetNameServiceIdException() {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICES, "nn1,nn2");
|
conf.set(DFS_FEDERATION_NAMESERVICES, "nn1,nn2");
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"),
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"),
|
||||||
"localhost:9000");
|
"localhost:9000");
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
|
||||||
"localhost:9001");
|
"localhost:9001");
|
||||||
DFSUtil.getNamenodeNameServiceId(conf);
|
DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
fail("Expected exception is not thrown");
|
fail("Expected exception is not thrown");
|
||||||
@ -178,9 +178,9 @@ public void testMultipleNamenodes() throws IOException {
|
|||||||
final String NN1_ADDRESS = "localhost:9000";
|
final String NN1_ADDRESS = "localhost:9000";
|
||||||
final String NN2_ADDRESS = "localhost:9001";
|
final String NN2_ADDRESS = "localhost:9001";
|
||||||
final String NN3_ADDRESS = "localhost:9002";
|
final String NN3_ADDRESS = "localhost:9002";
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"),
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"),
|
||||||
NN1_ADDRESS);
|
NN1_ADDRESS);
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
|
||||||
NN2_ADDRESS);
|
NN2_ADDRESS);
|
||||||
|
|
||||||
Collection<InetSocketAddress> nnAddresses = DFSUtil
|
Collection<InetSocketAddress> nnAddresses = DFSUtil
|
||||||
@ -247,7 +247,7 @@ public void testDefaultNamenode() throws IOException {
|
|||||||
* copied to generic keys when the namenode starts.
|
* copied to generic keys when the namenode starts.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConfModification() throws IOException {
|
public void testConfModification() {
|
||||||
final HdfsConfiguration conf = new HdfsConfiguration();
|
final HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICES, "nn1");
|
conf.set(DFS_FEDERATION_NAMESERVICES, "nn1");
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICE_ID, "nn1");
|
conf.set(DFS_FEDERATION_NAMESERVICE_ID, "nn1");
|
||||||
@ -256,7 +256,7 @@ public void testConfModification() throws IOException {
|
|||||||
// Set the nameservice specific keys with nameserviceId in the config key
|
// Set the nameservice specific keys with nameserviceId in the config key
|
||||||
for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
|
for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
|
||||||
// Note: value is same as the key
|
// Note: value is same as the key
|
||||||
conf.set(DFSUtil.getNameServiceIdKey(key, nameserviceId), key);
|
conf.set(DFSUtil.addKeySuffixes(key, nameserviceId), key);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize generic keys from specific keys
|
// Initialize generic keys from specific keys
|
||||||
@ -282,18 +282,21 @@ public void testEmptyConf() {
|
|||||||
DFSUtil.getNNServiceRpcAddresses(conf);
|
DFSUtil.getNNServiceRpcAddresses(conf);
|
||||||
fail("Expected IOException is not thrown");
|
fail("Expected IOException is not thrown");
|
||||||
} catch (IOException expected) {
|
} catch (IOException expected) {
|
||||||
|
/** Expected */
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DFSUtil.getBackupNodeAddresses(conf);
|
DFSUtil.getBackupNodeAddresses(conf);
|
||||||
fail("Expected IOException is not thrown");
|
fail("Expected IOException is not thrown");
|
||||||
} catch (IOException expected) {
|
} catch (IOException expected) {
|
||||||
|
/** Expected */
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DFSUtil.getSecondaryNameNodeAddresses(conf);
|
DFSUtil.getSecondaryNameNodeAddresses(conf);
|
||||||
fail("Expected IOException is not thrown");
|
fail("Expected IOException is not thrown");
|
||||||
} catch (IOException expected) {
|
} catch (IOException expected) {
|
||||||
|
/** Expected */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1088,9 +1088,9 @@ public void testMultipleSecondaryNamenodes() throws IOException {
|
|||||||
snConf2.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "");
|
snConf2.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "");
|
||||||
|
|
||||||
// Set the nameserviceIds
|
// Set the nameserviceIds
|
||||||
snConf1.set(DFSUtil.getNameServiceIdKey(
|
snConf1.set(DFSUtil.addKeySuffixes(
|
||||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nameserviceId1), nn1);
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nameserviceId1), nn1);
|
||||||
snConf2.set(DFSUtil.getNameServiceIdKey(
|
snConf2.set(DFSUtil.addKeySuffixes(
|
||||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nameserviceId2), nn2);
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nameserviceId2), nn2);
|
||||||
|
|
||||||
SecondaryNameNode secondary1 = startSecondaryNameNode(snConf1);
|
SecondaryNameNode secondary1 = startSecondaryNameNode(snConf1);
|
||||||
|
@ -72,7 +72,7 @@ private String[] setupAddress(HdfsConfiguration conf, String key,
|
|||||||
String[] values = new String[nameServiceIdCount];
|
String[] values = new String[nameServiceIdCount];
|
||||||
for (int i = 0; i < nameServiceIdCount; i++, portOffset++) {
|
for (int i = 0; i < nameServiceIdCount; i++, portOffset++) {
|
||||||
String nsID = getNameServiceId(i);
|
String nsID = getNameServiceId(i);
|
||||||
String specificKey = DFSUtil.getNameServiceIdKey(key, nsID);
|
String specificKey = DFSUtil.addKeySuffixes(key, nsID);
|
||||||
values[i] = "nn" + i + ":" + portOffset;
|
values[i] = "nn" + i + ":" + portOffset;
|
||||||
conf.set(specificKey, values[i]);
|
conf.set(specificKey, values[i]);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user