HDFS-13224. RBF: Resolvers to support mount points across multiple subclusters. Contributed by Inigo Goiri.

This commit is contained in:
Inigo Goiri 2018-03-15 10:32:30 -07:00
parent da777a5498
commit e71bc00a47
19 changed files with 1589 additions and 21 deletions

View File

@ -240,7 +240,7 @@ private void invalidateLocationCache(final String path) {
PathLocation loc = entry.getValue();
String src = loc.getSourcePath();
if (src != null) {
if(isParentEntry(src, path)) {
if (isParentEntry(src, path)) {
LOG.debug("Removing {}", src);
it.remove();
}
@ -306,7 +306,7 @@ public void refreshEntries(final Collection<MountTable> entries) {
existingEntry, entry);
this.tree.put(srcPath, entry);
invalidateLocationCache(srcPath);
LOG.info("Updated mount point {} in resolver");
LOG.info("Updated mount point {} in resolver", srcPath);
}
}
}
@ -515,7 +515,7 @@ private static PathLocation buildLocation(
String nsId = oneDst.getNameserviceId();
String dest = oneDst.getDest();
String newPath = dest;
if (!newPath.endsWith(Path.SEPARATOR)) {
if (!newPath.endsWith(Path.SEPARATOR) && !remainingPath.isEmpty()) {
newPath += Path.SEPARATOR;
}
newPath += remainingPath;

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import java.io.IOException;
import java.util.EnumMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.OrderedResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.RandomResolver;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Mount table resolver that supports multiple locations for each mount entry.
* The returned location contains prioritized remote paths from highest priority
* to the lowest priority. Multiple locations for a mount point are optional.
* When multiple locations are specified, both will be checked for the presence
* of a file and the nameservice for a new file/dir is chosen based on the
* results of a consistent hashing algorithm.
* <p>
* Does the Mount table entry for this path have multiple destinations?
* <ul>
* <li>No -> Return the location
* <li>Yes -> Return all locations, prioritizing the best guess from the
* consistent hashing algorithm.
* </ul>
* <p>
* It has multiple options to order the locations: HASH (default), LOCAL,
* RANDOM, and HASH_ALL.
* <p>
* The consistent hashing result is dependent on the number and combination of
* nameservices that are registered for particular mount point. The order of
* nameservices/locations in the mount table is not prioritized. Each consistent
* hash calculation considers only the set of unique nameservices present for
* the mount table location.
*/
public class MultipleDestinationMountTableResolver extends MountTableResolver {
private static final Logger LOG =
LoggerFactory.getLogger(MultipleDestinationMountTableResolver.class);
/** Resolvers that use a particular order for multiple destinations. */
private EnumMap<DestinationOrder, OrderedResolver> orderedResolvers =
new EnumMap<>(DestinationOrder.class);
public MultipleDestinationMountTableResolver(
Configuration conf, Router router) {
super(conf, router);
// Initialize the ordered resolvers
addResolver(DestinationOrder.HASH, new HashFirstResolver());
addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
addResolver(DestinationOrder.RANDOM, new RandomResolver());
addResolver(DestinationOrder.HASH_ALL, new HashResolver());
}
@Override
public PathLocation getDestinationForPath(String path) throws IOException {
PathLocation mountTableResult = super.getDestinationForPath(path);
if (mountTableResult == null) {
LOG.error("The {} cannot find a location for {}",
super.getClass().getSimpleName(), path);
} else if (mountTableResult.hasMultipleDestinations()) {
DestinationOrder order = mountTableResult.getDestinationOrder();
OrderedResolver orderedResolver = orderedResolvers.get(order);
if (orderedResolver == null) {
LOG.error("Cannot find resolver for order {}", order);
} else {
String firstNamespace =
orderedResolver.getFirstNamespace(path, mountTableResult);
// Change the order of the name spaces according to the policy
if (firstNamespace != null) {
// This is the entity in the tree, we need to create our own copy
mountTableResult = new PathLocation(mountTableResult, firstNamespace);
LOG.debug("Ordered locations following {} are {}",
order, mountTableResult);
} else {
LOG.error("Cannot get main namespace for path {} with order {}",
path, order);
}
}
}
return mountTableResult;
}
@VisibleForTesting
public void addResolver(DestinationOrder order, OrderedResolver resolver) {
orderedResolvers.put(order, resolver);
}
}

View File

@ -23,7 +23,8 @@
* this determines which location should be checked first.
*/
public enum DestinationOrder {
HASH, // Follow consistent hashing
HASH, // Follow consistent hashing in the first folder level
LOCAL, // Local first
RANDOM // Random order
RANDOM, // Random order
HASH_ALL // Follow consistent hashing
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
/**
* Variation of HashResolver that only uses the first level of the path.
*/
public class HashFirstResolver extends HashResolver {
@Override
public String getFirstNamespace(final String path, final PathLocation loc) {
String srcPath = loc.getSourcePath();
String trimmedPath = trimPathToChild(path, srcPath);
LOG.debug("Only using the first part of the path: {} -> {}",
path, trimmedPath);
return super.getFirstNamespace(trimmedPath, loc);
}
/**
* Hash only up to the immediate child of the mount point. This prevents the
* need to create/maintain subtrees under each multi-destination mount point.
* Each child of a multi-destination mount is mapped to only one hash
* location.
* <p>
* Trims a path to at most the immediate child of a parent path. For example:
* <ul>
* <li>path = /a/b/c, parent = /a will be trimmed to /a/b.
* <li>path = /a/b, parent = /a/b will be trimmed to /a/b
* </ul>
*
* @param path The path to trim.
* @param parent The parent used to find the immediate child.
* @return Trimmed path.
*/
private static String trimPathToChild(String path, String parent) {
// Path is invalid or equal to the parent
if (path.length() <= parent.length()) {
return parent;
}
String remainder = path.substring(parent.length());
String[] components =
remainder.replaceFirst("^/", "").split(Path.SEPARATOR);
if (components.length > 0 && components[0].length() > 0) {
if (parent.endsWith(Path.SEPARATOR)) {
return parent + components[0];
} else {
return parent + Path.SEPARATOR + components[0];
}
} else {
return parent;
}
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.utils.ConsistentHashRing;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Order the destinations based on consistent hashing.
*/
public class HashResolver implements OrderedResolver {
protected static final Logger LOG =
LoggerFactory.getLogger(HashResolver.class);
/** Namespace set hash -> Locator. */
private final Map<Integer, ConsistentHashRing> hashResolverMap;
/** Patterns for temporary files. */
private static final String HEX_PATTERN = "\\p{XDigit}";
private static final String UUID_PATTERN = HEX_PATTERN + "{8}-" +
HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" +
HEX_PATTERN + "{12}";
private static final String ATTEMPT_PATTERN =
"attempt_\\d+_\\d{4}_._\\d{6}_\\d{2}";
private static final String[] TEMP_FILE_PATTERNS = {
"(.+)\\.COPYING$",
"(.+)\\._COPYING_.*$",
"(.+)\\.tmp$",
"_temp/(.+)$",
"_temporary/(.+)\\." + UUID_PATTERN + "$",
"(.*)_temporary/\\d/_temporary/" + ATTEMPT_PATTERN + "/(.+)$" };
/** Pattern for temporary files (or of the individual patterns). */
private static final Pattern TEMP_FILE_PATTERN =
Pattern.compile(StringUtils.join("|", TEMP_FILE_PATTERNS));
public HashResolver() {
this.hashResolverMap = new ConcurrentHashMap<>();
}
/**
* Use the result from consistent hashing locator to prioritize the locations
* for a path.
*
* @param path Path to check.
* @param loc Federated location with multiple destinations.
* @return First namespace based on hash.
*/
@Override
public String getFirstNamespace(final String path, final PathLocation loc) {
String finalPath = extractTempFileName(path);
Set<String> namespaces = loc.getNamespaces();
ConsistentHashRing locator = getHashResolver(namespaces);
String hashedSubcluster = locator.getLocation(finalPath);
if (hashedSubcluster == null) {
String srcPath = loc.getSourcePath();
LOG.error("Cannot find subcluster for {} ({} -> {})",
srcPath, path, finalPath);
}
LOG.debug("Namespace for {} ({}) is {}", path, finalPath, hashedSubcluster);
return hashedSubcluster;
}
/**
* Get the cached (if available) or generate a new hash resolver for this
* particular set of unique namespace identifiers.
*
* @param namespaces A set of unique namespace identifiers.
* @return A hash resolver configured to consistently resolve paths to
* namespaces using the provided set of namespace identifiers.
*/
private ConsistentHashRing getHashResolver(final Set<String> namespaces) {
int hash = namespaces.hashCode();
ConsistentHashRing resolver = this.hashResolverMap.get(hash);
if (resolver == null) {
resolver = new ConsistentHashRing(namespaces);
this.hashResolverMap.put(hash, resolver);
}
return resolver;
}
/**
* Some files use a temporary naming pattern. Extract the final name from the
* temporary name. For example, files *._COPYING_ will be renamed, so we
* remove that chunk.
*
* @param input Input string.
* @return Final file name.
*/
@VisibleForTesting
public static String extractTempFileName(final String input) {
StringBuilder sb = new StringBuilder();
Matcher matcher = TEMP_FILE_PATTERN.matcher(input);
if (matcher.find()) {
for (int i=1; i <= matcher.groupCount(); i++) {
String match = matcher.group(i);
if (match != null) {
sb.append(match);
}
}
}
if (sb.length() > 0) {
String ret = sb.toString();
LOG.debug("Extracted {} from {}", ret, input);
return ret;
}
return input;
}
}

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The local subcluster (where the writer is) should be tried first. The writer
* is defined from the RPC query received in the RPC server.
*/
public class LocalResolver implements OrderedResolver {
private static final Logger LOG =
LoggerFactory.getLogger(LocalResolver.class);
/** Configuration key to set the minimum time to update the local cache.*/
public static final String MIN_UPDATE_PERIOD_KEY =
DFSConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
/** 10 seconds by default. */
private static final long MIN_UPDATE_PERIOD_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
/** Router service. */
private final Router router;
/** Minimum update time. */
private final long minUpdateTime;
/** Node IP -> Subcluster. */
private Map<String, String> nodeSubcluster = null;
/** Last time the subcluster map was updated. */
private long lastUpdated;
public LocalResolver(final Configuration conf, final Router routerService) {
this.minUpdateTime = conf.getTimeDuration(
MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT,
TimeUnit.MILLISECONDS);
this.router = routerService;
}
/**
* Get the local name space. This relies on the RPC Server to get the address
* from the client.
*
* TODO we only support DN and NN locations, we need to add others like
* Resource Managers.
*
* @param path Path ignored by this policy.
* @param loc Federated location with multiple destinations.
* @return Local name space. Null if we don't know about this machine.
*/
@Override
public String getFirstNamespace(final String path, final PathLocation loc) {
String localSubcluster = null;
String clientAddr = getClientAddr();
Map<String, String> nodeToSubcluster = getSubclusterMappings();
if (nodeToSubcluster != null) {
localSubcluster = nodeToSubcluster.get(clientAddr);
if (localSubcluster != null) {
LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
} else {
LOG.error("Cannot get local namespace for {}", clientAddr);
}
} else {
LOG.error("Cannot get node mapping when resolving {} at {} from {}",
path, loc, clientAddr);
}
return localSubcluster;
}
@VisibleForTesting
String getClientAddr() {
return Server.getRemoteAddress();
}
/**
* Get the mapping from nodes to subcluster. It gets this mapping from the
* subclusters through expensive calls (e.g., RPC) and uses caching to avoid
* too many calls. The cache might be updated asynchronously to reduce
* latency.
*
* @return Node IP -> Subcluster.
*/
@VisibleForTesting
synchronized Map<String, String> getSubclusterMappings() {
if (nodeSubcluster == null ||
(monotonicNow() - lastUpdated) > minUpdateTime) {
// Fetch the mapping asynchronously
Thread updater = new Thread(new Runnable() {
@Override
public void run() {
Map<String, String> mapping = new HashMap<>();
Map<String, String> dnSubcluster = getDatanodesSubcluster();
if (dnSubcluster != null) {
mapping.putAll(dnSubcluster);
}
Map<String, String> nnSubcluster = getNamenodesSubcluster();
if (nnSubcluster != null) {
mapping.putAll(nnSubcluster);
}
nodeSubcluster = mapping;
lastUpdated = monotonicNow();
}
});
updater.start();
// Wait until initialized
if (nodeSubcluster == null) {
try {
LOG.debug("Wait to get the mapping for the first time");
updater.join();
} catch (InterruptedException e) {
LOG.error("Cannot wait for the updater to finish");
}
}
}
return nodeSubcluster;
}
/**
* Get the Datanode mapping from the subclusters from the Namenodes. This
* needs to be done as a privileged action to use the user for the Router and
* not the one from the client in the RPC call.
*
* @return DN IP -> Subcluster.
*/
private Map<String, String> getDatanodesSubcluster() {
final RouterRpcServer rpcServer = getRpcServer();
if (rpcServer == null) {
LOG.error("Cannot access the Router RPC server");
return null;
}
Map<String, String> ret = new HashMap<>();
try {
// We need to get the DNs as a privileged user
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
Map<String, DatanodeStorageReport[]> dnMap = loginUser.doAs(
new PrivilegedAction<Map<String, DatanodeStorageReport[]>>() {
@Override
public Map<String, DatanodeStorageReport[]> run() {
try {
return rpcServer.getDatanodeStorageReportMap(
DatanodeReportType.ALL);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
return null;
}
}
});
for (Entry<String, DatanodeStorageReport[]> entry : dnMap.entrySet()) {
String nsId = entry.getKey();
DatanodeStorageReport[] dns = entry.getValue();
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
String ipAddr = dnInfo.getIpAddr();
ret.put(ipAddr, nsId);
}
}
} catch (IOException e) {
LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage());
}
return ret;
}
/**
* Get the Namenode mapping from the subclusters from the Membership store. As
* the Routers are usually co-located with Namenodes, we also check for the
* local address for this Router here.
*
* @return NN IP -> Subcluster.
*/
private Map<String, String> getNamenodesSubcluster() {
final MembershipStore membershipStore = getMembershipStore();
if (membershipStore == null) {
LOG.error("Cannot access the Membership store");
return null;
}
// Manage requests from this hostname (127.0.0.1)
String localIp = "127.0.0.1";
String localHostname = localIp;
try {
localHostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Cannot get local host name");
}
Map<String, String> ret = new HashMap<>();
try {
// Get the values from the store
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance();
GetNamenodeRegistrationsResponse response =
membershipStore.getNamenodeRegistrations(request);
final List<MembershipState> nns = response.getNamenodeMemberships();
for (MembershipState nn : nns) {
try {
String nsId = nn.getNameserviceId();
String rpcAddress = nn.getRpcAddress();
String hostname = HostAndPort.fromString(rpcAddress).getHostText();
ret.put(hostname, nsId);
if (hostname.equals(localHostname)) {
ret.put(localIp, nsId);
}
InetAddress addr = InetAddress.getByName(hostname);
String ipAddr = addr.getHostAddress();
ret.put(ipAddr, nsId);
} catch (Exception e) {
LOG.error("Cannot get address for {}: {}", nn, e.getMessage());
}
}
} catch (IOException ioe) {
LOG.error("Cannot get Namenodes from the State Store: {}",
ioe.getMessage());
}
return ret;
}
/**
* Get the Router RPC server.
*
* @return Router RPC server. Null if not possible.
*/
private RouterRpcServer getRpcServer() {
if (this.router == null) {
return null;
}
return router.getRpcServer();
}
/**
* Get the Membership store.
*
* @return Membership store.
*/
private MembershipStore getMembershipStore() {
StateStoreService stateStore = router.getStateStore();
if (stateStore == null) {
return null;
}
return stateStore.getRegisteredRecordStore(MembershipStore.class);
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
/**
* Policy that decides which should be the first location accessed given
* multiple destinations.
*/
public interface OrderedResolver {
/**
* Get the first namespace based on this resolver approach.
*
* @param path Path to check.
* @param loc Federated location with multiple destinations.
* @return First namespace out of the locations.
*/
String getFirstNamespace(String path, PathLocation loc);
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Order the destinations randomly.
*/
public class RandomResolver implements OrderedResolver {
private static final Logger LOG =
LoggerFactory.getLogger(RandomResolver.class);
/** Random number generator. */
private static final Random RANDOM = new Random();
/**
* Get a random name space from the path.
*
* @param path Path ignored by this policy.
* @param loc Federated location with multiple destinations.
* @return Random name space.
*/
public String getFirstNamespace(final String path, final PathLocation loc) {
if (loc == null) {
return null;
}
Set<String> namespaces = loc.getNamespaces();
if (namespaces == null || namespaces.isEmpty()) {
LOG.error("Cannot get namespaces for {}", loc);
return null;
}
List<String> nssList = new ArrayList<>(namespaces);
int index = RANDOM.nextInt(nssList.size());
return nssList.get(index);
}
}

View File

@ -1204,31 +1204,56 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
checkOperation(OperationCategory.UNCHECKED);
Map<String, DatanodeStorageReport> datanodesMap = new HashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeStorageReport[]> results =
rpcClient.invokeConcurrent(
nss, method, true, false, DatanodeStorageReport[].class);
for (DatanodeStorageReport[] result : results.values()) {
for (DatanodeStorageReport node : result) {
String nodeId = node.getDatanodeInfo().getXferAddr();
Map<String, DatanodeStorageReport[]> dnSubcluster =
getDatanodeStorageReportMap(type);
// Avoid repeating machines in multiple subclusters
Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
String nodeId = dnInfo.getXferAddr();
if (!datanodesMap.containsKey(nodeId)) {
datanodesMap.put(nodeId, node);
datanodesMap.put(nodeId, dn);
}
// TODO merge somehow, right now it just takes the first one
}
}
Collection<DatanodeStorageReport> datanodes = datanodesMap.values();
// TODO sort somehow
DatanodeStorageReport[] combinedData =
new DatanodeStorageReport[datanodes.size()];
combinedData = datanodes.toArray(combinedData);
return combinedData;
}
/**
* Get the list of datanodes per subcluster.
*
* @param type Type of the datanodes to get.
* @return nsId -> datanode list.
* @throws IOException
*/
public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
DatanodeReportType type) throws IOException {
Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeStorageReport[]> results =
rpcClient.invokeConcurrent(
nss, method, true, false, DatanodeStorageReport[].class);
for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
String nsId = ns.getNameserviceId();
DatanodeStorageReport[] result = entry.getValue();
ret.put(nsId, result);
}
return ret;
}
@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {

View File

@ -295,6 +295,8 @@ private DestinationOrder convert(DestOrder order) {
return DestinationOrder.LOCAL;
case RANDOM:
return DestinationOrder.RANDOM;
case HASH_ALL:
return DestinationOrder.HASH_ALL;
default:
return DestinationOrder.HASH;
}
@ -306,6 +308,8 @@ private DestOrder convert(DestinationOrder order) {
return DestOrder.LOCAL;
case RANDOM:
return DestOrder.RANDOM;
case HASH_ALL:
return DestOrder.HASH_ALL;
default:
return DestOrder.HASH;
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.io.MD5Hash;
/**
* Consistent hash ring to distribute items across nodes (locations). If we add
* or remove nodes, it minimizes the item migration.
*/
public class ConsistentHashRing {
private static final String SEPERATOR = "/";
private static final String VIRTUAL_NODE_FORMAT = "%s" + SEPERATOR + "%d";
/** Hash ring. */
private SortedMap<String, String> ring = new TreeMap<String, String>();
/** Entry -> num virtual nodes on ring. */
private Map<String, Integer> entryToVirtualNodes =
new HashMap<String, Integer>();
/** Synchronization. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public ConsistentHashRing(Set<String> locations) {
for (String location : locations) {
addLocation(location);
}
}
/**
* Add entry to consistent hash ring.
*
* @param location Node to add to the ring.
*/
public void addLocation(String location) {
addLocation(location, 100);
}
/**
* Add entry to consistent hash ring.
*
* @param location Node to add to the ring.
* @param numVirtualNodes Number of virtual nodes to add.
*/
public void addLocation(String location, int numVirtualNodes) {
writeLock.lock();
try {
entryToVirtualNodes.put(location, numVirtualNodes);
for (int i = 0; i < numVirtualNodes; i++) {
String key = String.format(VIRTUAL_NODE_FORMAT, location, i);
String hash = getHash(key);
ring.put(hash, key);
}
} finally {
writeLock.unlock();
}
}
/**
* Remove specified entry from hash ring.
*
* @param location Node to remove from the ring.
*/
public void removeLocation(String location) {
writeLock.lock();
try {
Integer numVirtualNodes = entryToVirtualNodes.remove(location);
for (int i = 0; i < numVirtualNodes; i++) {
String key = String.format(VIRTUAL_NODE_FORMAT, location, i);
String hash = getHash(key);
ring.remove(hash);
}
} finally {
writeLock.unlock();
}
}
/**
* Return location (owner) of specified item. Owner is the next
* entry on the hash ring (with a hash value > hash value of item).
* @param item Item to look for.
* @return The location of the item.
*/
public String getLocation(String item) {
readLock.lock();
try {
if (ring.isEmpty()) {
return null;
}
String hash = getHash(item);
if (!ring.containsKey(hash)) {
SortedMap<String, String> tailMap = ring.tailMap(hash);
hash = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
}
String virtualNode = ring.get(hash);
int index = virtualNode.lastIndexOf(SEPERATOR);
if (index >= 0) {
return virtualNode.substring(0, index);
} else {
return virtualNode;
}
} finally {
readLock.unlock();
}
}
public String getHash(String key) {
return MD5Hash.digest(key).toString();
}
/**
* Get the locations in the ring.
* @return Set of locations in the ring.
*/
public Set<String> getLocations() {
return entryToVirtualNodes.keySet();
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Assorted utility classes and helpers for HDFS Federation.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.server.federation.utils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -88,7 +88,8 @@ public RouterAdmin(Configuration conf) {
public void printUsage() {
String usage = "Federation Admin Tools:\n"
+ "\t[-add <source> <nameservice> <destination> "
+ "[-readonly] -owner <owner> -group <group> -mode <mode>]\n"
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]\n"
+ "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n"
+ "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota "

View File

@ -129,6 +129,7 @@ message MountTableRecordProto {
HASH = 0;
LOCAL = 1;
RANDOM = 2;
HASH_ALL = 3;
}
optional DestOrder destOrder = 6 [default = HASH];

View File

@ -497,9 +497,9 @@ public void testLocationCache() throws Exception {
map2);
entries.add(entry2);
mountTable.refreshEntries(entries);
assertEquals("1->/testlocationcache/",
assertEquals("1->/testlocationcache",
mountTable.getDestinationForPath("/testlocationcache").toString());
assertEquals("2->/anothertestlocationcache/",
assertEquals("2->/anothertestlocationcache",
mountTable.getDestinationForPath("/anothertestlocationcache")
.toString());
@ -518,7 +518,7 @@ public void testLocationCache() throws Exception {
mountTable.refreshEntries(entries);
// Ensure location cache update correctly
assertEquals("3->/testlocationcache/",
assertEquals("3->/testlocationcache",
mountTable.getDestinationForPath("/testlocationcache").toString());
// Cleanup before exit

View File

@ -0,0 +1,419 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import static org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver.extractTempFileName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.junit.Before;
import org.junit.Test;
/**
* Test the multiple destination resolver.
*/
public class TestMultipleDestinationResolver {
private MultipleDestinationMountTableResolver resolver;
@Before
public void setup() throws IOException {
Configuration conf = new Configuration();
resolver = new MultipleDestinationMountTableResolver(conf, null);
// We manually point /tmp to only subcluster0
Map<String, String> map1 = new HashMap<>();
map1.put("subcluster0", "/tmp");
resolver.addEntry(MountTable.newInstance("/tmp", map1));
// We manually point / to subcluster0,1,2 with default order (hash)
Map<String, String> mapDefault = new HashMap<>();
mapDefault.put("subcluster0", "/");
mapDefault.put("subcluster1", "/");
mapDefault.put("subcluster2", "/");
MountTable defaultEntry = MountTable.newInstance("/", mapDefault);
resolver.addEntry(defaultEntry);
// We manually point /hash to subcluster0,1,2 with hashing
Map<String, String> mapHash = new HashMap<>();
mapHash.put("subcluster0", "/hash");
mapHash.put("subcluster1", "/hash");
mapHash.put("subcluster2", "/hash");
MountTable hashEntry = MountTable.newInstance("/hash", mapHash);
hashEntry.setDestOrder(DestinationOrder.HASH);
resolver.addEntry(hashEntry);
// We manually point /hashall to subcluster0,1,2 with hashing (full tree)
Map<String, String> mapHashAll = new HashMap<>();
mapHashAll.put("subcluster0", "/hashall");
mapHashAll.put("subcluster1", "/hashall");
mapHashAll.put("subcluster2", "/hashall");
MountTable hashEntryAll = MountTable.newInstance("/hashall", mapHashAll);
hashEntryAll.setDestOrder(DestinationOrder.HASH_ALL);
resolver.addEntry(hashEntryAll);
// We point /local to subclusters 0, 1, 2 with the local order
Map<String, String> mapLocal = new HashMap<>();
mapLocal.put("subcluster0", "/local");
mapLocal.put("subcluster1", "/local");
mapLocal.put("subcluster2", "/local");
MountTable localEntry = MountTable.newInstance("/local", mapLocal);
localEntry.setDestOrder(DestinationOrder.LOCAL);
resolver.addEntry(localEntry);
// We point /random to subclusters 0, 1, 2 with the random order
Map<String, String> mapRandom = new HashMap<>();
mapRandom.put("subcluster0", "/random");
mapRandom.put("subcluster1", "/random");
mapRandom.put("subcluster2", "/random");
MountTable randomEntry = MountTable.newInstance("/random", mapRandom);
randomEntry.setDestOrder(DestinationOrder.RANDOM);
resolver.addEntry(randomEntry);
// Read only mount point
Map<String, String> mapReadOnly = new HashMap<>();
mapReadOnly.put("subcluster0", "/readonly");
mapReadOnly.put("subcluster1", "/readonly");
mapReadOnly.put("subcluster2", "/readonly");
MountTable readOnlyEntry = MountTable.newInstance("/readonly", mapReadOnly);
readOnlyEntry.setReadOnly(true);
resolver.addEntry(readOnlyEntry);
}
@Test
public void testHashEqualDistribution() throws IOException {
// First level
testEvenDistribution("/hash");
testEvenDistribution("/hash/folder0", false);
// All levels
testEvenDistribution("/hashall");
testEvenDistribution("/hashall/folder0");
}
@Test
public void testHashAll() throws IOException {
// Files should be spread across subclusters
PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
assertDest("subcluster0", dest0);
PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
assertDest("subcluster1", dest1);
// Files within folder should be spread across subclusters
PathLocation dest2 = resolver.getDestinationForPath("/hashall/folder0");
assertDest("subcluster2", dest2);
PathLocation dest3 = resolver.getDestinationForPath(
"/hashall/folder0/file0.txt");
assertDest("subcluster1", dest3);
PathLocation dest4 = resolver.getDestinationForPath(
"/hashall/folder0/file1.txt");
assertDest("subcluster0", dest4);
PathLocation dest5 = resolver.getDestinationForPath(
"/hashall/folder0/folder0/file0.txt");
assertDest("subcluster1", dest5);
PathLocation dest6 = resolver.getDestinationForPath(
"/hashall/folder0/folder0/file1.txt");
assertDest("subcluster1", dest6);
PathLocation dest7 = resolver.getDestinationForPath(
"/hashall/folder0/folder0/file2.txt");
assertDest("subcluster0", dest7);
PathLocation dest8 = resolver.getDestinationForPath("/hashall/folder1");
assertDest("subcluster1", dest8);
PathLocation dest9 = resolver.getDestinationForPath(
"/hashall/folder1/file0.txt");
assertDest("subcluster0", dest9);
PathLocation dest10 = resolver.getDestinationForPath(
"/hashall/folder1/file1.txt");
assertDest("subcluster1", dest10);
PathLocation dest11 = resolver.getDestinationForPath("/hashall/folder2");
assertDest("subcluster2", dest11);
PathLocation dest12 = resolver.getDestinationForPath(
"/hashall/folder2/file0.txt");
assertDest("subcluster0", dest12);
PathLocation dest13 = resolver.getDestinationForPath(
"/hashall/folder2/file1.txt");
assertDest("subcluster0", dest13);
PathLocation dest14 = resolver.getDestinationForPath(
"/hashall/folder2/file2.txt");
assertDest("subcluster1", dest14);
}
@Test
public void testHashFirst() throws IOException {
PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
assertDest("subcluster0", dest0);
PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
assertDest("subcluster1", dest1);
// All these must be in the same location: subcluster0
PathLocation dest2 = resolver.getDestinationForPath("/hash/folder0");
assertDest("subcluster0", dest2);
PathLocation dest3 = resolver.getDestinationForPath(
"/hash/folder0/file0.txt");
assertDest("subcluster0", dest3);
PathLocation dest4 = resolver.getDestinationForPath(
"/hash/folder0/file1.txt");
assertDest("subcluster0", dest4);
PathLocation dest5 = resolver.getDestinationForPath(
"/hash/folder0/folder0/file0.txt");
assertDest("subcluster0", dest5);
PathLocation dest6 = resolver.getDestinationForPath(
"/hash/folder0/folder0/file1.txt");
assertDest("subcluster0", dest6);
// All these must be in the same location: subcluster2
PathLocation dest7 = resolver.getDestinationForPath("/hash/folder1");
assertDest("subcluster2", dest7);
PathLocation dest8 = resolver.getDestinationForPath(
"/hash/folder1/file0.txt");
assertDest("subcluster2", dest8);
PathLocation dest9 = resolver.getDestinationForPath(
"/hash/folder1/file1.txt");
assertDest("subcluster2", dest9);
// All these must be in the same location: subcluster2
PathLocation dest10 = resolver.getDestinationForPath("/hash/folder2");
assertDest("subcluster2", dest10);
PathLocation dest11 = resolver.getDestinationForPath(
"/hash/folder2/file0.txt");
assertDest("subcluster2", dest11);
PathLocation dest12 = resolver.getDestinationForPath(
"/hash/folder2/file1.txt");
assertDest("subcluster2", dest12);
}
@Test
public void testRandomEqualDistribution() throws IOException {
testEvenDistribution("/random");
}
@Test
public void testSingleDestination() throws IOException {
// All the files in /tmp should be in subcluster0
for (int f = 0; f < 100; f++) {
String filename = "/tmp/b/c/file" + f + ".txt";
PathLocation destination = resolver.getDestinationForPath(filename);
RemoteLocation loc = destination.getDefaultLocation();
assertEquals("subcluster0", loc.getNameserviceId());
assertEquals(filename, loc.getDest());
}
}
@Test
public void testResolveSubdirectories() throws Exception {
// Simulate a testdir under a multi-destination mount.
Random r = new Random();
String testDir = "/sort/testdir" + r.nextInt();
String file1 = testDir + "/file1" + r.nextInt();
String file2 = testDir + "/file2" + r.nextInt();
// Verify both files resolve to the same namespace as the parent dir.
PathLocation testDirLocation = resolver.getDestinationForPath(testDir);
RemoteLocation defaultLoc = testDirLocation.getDefaultLocation();
String testDirNamespace = defaultLoc.getNameserviceId();
PathLocation file1Location = resolver.getDestinationForPath(file1);
RemoteLocation defaultLoc1 = file1Location.getDefaultLocation();
assertEquals(testDirNamespace, defaultLoc1.getNameserviceId());
PathLocation file2Location = resolver.getDestinationForPath(file2);
RemoteLocation defaultLoc2 = file2Location.getDefaultLocation();
assertEquals(testDirNamespace, defaultLoc2.getNameserviceId());
}
@Test
public void testExtractTempFileName() {
for (String teststring : new String[] {
"testfile1.txt.COPYING",
"testfile1.txt._COPYING_",
"testfile1.txt._COPYING_.attempt_1486662804109_0055_m_000042_0",
"testfile1.txt.tmp",
"_temp/testfile1.txt",
"_temporary/testfile1.txt.af77e2ab-4bc5-4959-ae08-299c880ee6b8",
"_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
"testfile1.txt" }) {
String finalName = extractTempFileName(teststring);
assertEquals("testfile1.txt", finalName);
}
// False cases
assertEquals(
"file1.txt.COPYING1", extractTempFileName("file1.txt.COPYING1"));
assertEquals("file1.txt.tmp2", extractTempFileName("file1.txt.tmp2"));
// Speculation patterns
String finalName = extractTempFileName(
"_temporary/part-00007.af77e2ab-4bc5-4959-ae08-299c880ee6b8");
assertEquals("part-00007", finalName);
finalName = extractTempFileName(
"_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
"part-00003");
assertEquals("part-00003", finalName);
// Subfolders
finalName = extractTempFileName("folder0/testfile1.txt._COPYING_");
assertEquals("folder0/testfile1.txt", finalName);
finalName = extractTempFileName(
"folder0/folder1/testfile1.txt._COPYING_");
assertEquals("folder0/folder1/testfile1.txt", finalName);
finalName = extractTempFileName(
"processedHrsData.txt/_temporary/0/_temporary/" +
"attempt_201706281636_0007_m_000003_46/part-00003");
assertEquals("processedHrsData.txt/part-00003", finalName);
}
@Test
public void testReadOnly() throws IOException {
MountTable mount = resolver.getMountPoint("/readonly");
assertTrue(mount.isReadOnly());
PathLocation dest0 = resolver.getDestinationForPath("/readonly/file0.txt");
assertDest("subcluster1", dest0);
PathLocation dest1 = resolver.getDestinationForPath("/readonly/file1.txt");
assertDest("subcluster2", dest1);
// All these must be in the same location: subcluster0
PathLocation dest2 = resolver.getDestinationForPath("/readonly/folder0");
assertDest("subcluster1", dest2);
PathLocation dest3 = resolver.getDestinationForPath(
"/readonly/folder0/file0.txt");
assertDest("subcluster1", dest3);
PathLocation dest4 = resolver.getDestinationForPath(
"/readonly/folder0/file1.txt");
assertDest("subcluster1", dest4);
PathLocation dest5 = resolver.getDestinationForPath(
"/readonly/folder0/folder0/file0.txt");
assertDest("subcluster1", dest5);
PathLocation dest6 = resolver.getDestinationForPath(
"/readonly/folder0/folder0/file1.txt");
assertDest("subcluster1", dest6);
// All these must be in the same location: subcluster2
PathLocation dest7 = resolver.getDestinationForPath("/readonly/folder1");
assertDest("subcluster2", dest7);
PathLocation dest8 = resolver.getDestinationForPath(
"/readonly/folder1/file0.txt");
assertDest("subcluster2", dest8);
PathLocation dest9 = resolver.getDestinationForPath(
"/readonly/folder1/file1.txt");
assertDest("subcluster2", dest9);
// All these must be in the same location: subcluster2
PathLocation dest10 = resolver.getDestinationForPath("/readonly/folder2");
assertDest("subcluster1", dest10);
PathLocation dest11 = resolver.getDestinationForPath(
"/readonly/folder2/file0.txt");
assertDest("subcluster1", dest11);
PathLocation dest12 = resolver.getDestinationForPath(
"/readonly/folder2/file1.txt");
assertDest("subcluster1", dest12);
}
@Test
public void testLocalResolver() throws IOException {
PathLocation dest0 =
resolver.getDestinationForPath("/local/folder0/file0.txt");
assertDest("subcluster0", dest0);
}
@Test
public void testRandomResolver() throws IOException {
Set<String> destinations = new HashSet<>();
for (int i = 0; i < 30; i++) {
PathLocation dest =
resolver.getDestinationForPath("/random/folder0/file0.txt");
RemoteLocation firstDest = dest.getDestinations().get(0);
String nsId = firstDest.getNameserviceId();
destinations.add(nsId);
}
assertEquals(3, destinations.size());
}
/**
* Test that a path has files distributed across destinations evenly.
* @param path Path to check.
* @throws IOException
*/
private void testEvenDistribution(final String path) throws IOException {
testEvenDistribution(path, true);
}
/**
* Test that a path has files distributed across destinations evenly or not.
* @param path Path to check.
* @param even If the distribution should be even or not.
* @throws IOException If it cannot check it.
*/
private void testEvenDistribution(final String path, final boolean even)
throws IOException {
// Subcluster -> Files
Map<String, Set<String>> results = new HashMap<>();
for (int f = 0; f < 10000; f++) {
String filename = path + "/file" + f + ".txt";
PathLocation destination = resolver.getDestinationForPath(filename);
RemoteLocation loc = destination.getDefaultLocation();
assertEquals(filename, loc.getDest());
String nsId = loc.getNameserviceId();
if (!results.containsKey(nsId)) {
results.put(nsId, new TreeSet<>());
}
results.get(nsId).add(filename);
}
if (!even) {
// All files should be in one subcluster
assertEquals(1, results.size());
} else {
// Files should be distributed somewhat evenly
assertEquals(3, results.size());
int count = 0;
for (Set<String> files : results.values()) {
count = count + files.size();
}
int avg = count / results.keySet().size();
for (Set<String> files : results.values()) {
int filesCount = files.size();
// Check that the count in each namespace is within 20% of avg
assertTrue(filesCount > 0);
assertTrue(Math.abs(filesCount - avg) < (avg / 5));
}
}
}
private static void assertDest(String expectedDest, PathLocation loc) {
assertEquals(expectedDest, loc.getDestinations().get(0).getNameserviceId());
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test the {@link LocalResolver}.
*/
public class TestLocalResolver {
@Test
@SuppressWarnings("unchecked")
public void testLocalResolver() throws IOException {
// Mock the subcluster mapping
Configuration conf = new Configuration();
Router router = mock(Router.class);
StateStoreService stateStore = mock(StateStoreService.class);
MembershipStore membership = mock(MembershipStore.class);
when(router.getStateStore()).thenReturn(stateStore);
when(stateStore.getRegisteredRecordStore(any(Class.class)))
.thenReturn(membership);
GetNamenodeRegistrationsResponse response =
GetNamenodeRegistrationsResponse.newInstance();
// Set the mapping for each client
List<MembershipState> records = new LinkedList<>();
records.add(newMembershipState("client0", "subcluster0"));
records.add(newMembershipState("client1", "subcluster1"));
records.add(newMembershipState("client2", "subcluster2"));
response.setNamenodeMemberships(records);
when(membership.getNamenodeRegistrations(
any(GetNamenodeRegistrationsRequest.class))).thenReturn(response);
// Mock the client resolution: it will be anything in sb
StringBuilder sb = new StringBuilder("clientX");
LocalResolver localResolver = new LocalResolver(conf, router);
LocalResolver spyLocalResolver = spy(localResolver);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
return sb.toString();
}
}).when(spyLocalResolver).getClientAddr();
// Add the mocks to the resolver
MultipleDestinationMountTableResolver resolver =
new MultipleDestinationMountTableResolver(conf, router);
resolver.addResolver(DestinationOrder.LOCAL, spyLocalResolver);
// We point /local to subclusters 0, 1, 2 with the local order
Map<String, String> mapLocal = new HashMap<>();
mapLocal.put("subcluster0", "/local");
mapLocal.put("subcluster1", "/local");
mapLocal.put("subcluster2", "/local");
MountTable localEntry = MountTable.newInstance("/local", mapLocal);
localEntry.setDestOrder(DestinationOrder.LOCAL);
resolver.addEntry(localEntry);
// Test first with the default destination
PathLocation dest = resolver.getDestinationForPath("/local/file0.txt");
assertDestination("subcluster0", dest);
// We change the client location and verify
setClient(sb, "client2");
dest = resolver.getDestinationForPath("/local/file0.txt");
assertDestination("subcluster2", dest);
setClient(sb, "client1");
dest = resolver.getDestinationForPath("/local/file0.txt");
assertDestination("subcluster1", dest);
setClient(sb, "client0");
dest = resolver.getDestinationForPath("/local/file0.txt");
assertDestination("subcluster0", dest);
}
private void assertDestination(String expectedNsId, PathLocation loc) {
List<RemoteLocation> dests = loc.getDestinations();
RemoteLocation dest = dests.get(0);
assertEquals(expectedNsId, dest.getNameserviceId());
}
private MembershipState newMembershipState(String addr, String nsId) {
return MembershipState.newInstance(
"routerId", nsId, "nn0", "cluster0", "blockPool0",
addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004",
FederationNamenodeServiceState.ACTIVE, false);
}
/**
* Set the address of the client issuing the request. We use a StringBuilder
* to modify the value in place for the mock.
* @param sb StringBuilder to set the client string.
* @param client Address of the client.
*/
private static void setClient(StringBuilder sb, String client) {
sb.replace(0, sb.length(), client);
}
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
@ -40,6 +41,7 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.util.Time;
@ -172,6 +174,52 @@ public void testAddReadOnlyMountTable() throws IOException {
MountTable record = getMountTableEntry("/readonly");
assertEquals("/readonly", record.getSourcePath());
assertTrue(record.isReadOnly());
// Removing the new entry
RemoveMountTableEntryRequest removeRequest =
RemoveMountTableEntryRequest.newInstance("/readonly");
RemoveMountTableEntryResponse removeResponse =
mountTable.removeMountTableEntry(removeRequest);
assertTrue(removeResponse.getStatus());
}
@Test
public void testAddOrderMountTable() throws IOException {
testAddOrderMountTable(DestinationOrder.HASH);
testAddOrderMountTable(DestinationOrder.LOCAL);
testAddOrderMountTable(DestinationOrder.RANDOM);
testAddOrderMountTable(DestinationOrder.HASH_ALL);
}
private void testAddOrderMountTable(final DestinationOrder order)
throws IOException {
final String mnt = "/" + order;
MountTable newEntry = MountTable.newInstance(
mnt, Collections.singletonMap("ns0", "/testdir"),
Time.now(), Time.now());
newEntry.setDestOrder(order);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Add
AddMountTableEntryRequest addRequest;
AddMountTableEntryResponse addResponse;
addRequest = AddMountTableEntryRequest.newInstance(newEntry);
addResponse = mountTable.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
// Check that we have the read only entry
MountTable record = getMountTableEntry(mnt);
assertEquals(mnt, record.getSourcePath());
assertEquals(order, record.getDestOrder());
// Removing the new entry
RemoveMountTableEntryRequest removeRequest =
RemoveMountTableEntryRequest.newInstance(mnt);
RemoveMountTableEntryResponse removeResponse =
mountTable.removeMountTableEntry(removeRequest);
assertTrue(removeResponse.getStatus());
}
@Test

View File

@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
@ -32,7 +33,9 @@
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@ -48,6 +51,7 @@
import org.junit.Test;
import com.google.common.base.Supplier;
/**
* Tests Router admin commands.
*/
@ -141,6 +145,36 @@ public void testAddMountTable() throws Exception {
assertTrue(mountTable.isReadOnly());
}
@Test
public void testAddOrderMountTable() throws Exception {
testAddOrderMountTable(DestinationOrder.HASH);
testAddOrderMountTable(DestinationOrder.LOCAL);
testAddOrderMountTable(DestinationOrder.RANDOM);
testAddOrderMountTable(DestinationOrder.HASH_ALL);
}
private void testAddOrderMountTable(DestinationOrder order)
throws Exception {
final String mnt = "/" + order;
final String nsId = "ns0,ns1";
final String dest = "/";
String[] argv = new String[] {
"-add", mnt, nsId, dest, "-order", order.toString()};
assertEquals(0, ToolRunner.run(admin, argv));
// Check the state in the State Store
stateStore.loadCache(MountTableStoreImpl.class, true);
MountTableManager mountTable = client.getMountTableManager();
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance(mnt);
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
List<MountTable> entries = response.getEntries();
assertEquals(1, entries.size());
assertEquals(2, entries.get(0).getDestinations().size());
assertEquals(order, response.getEntries().get(0).getDestOrder());
}
@Test
public void testListMountTable() throws Exception {
String nsId = "ns0";