diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java index 27b43e54f2..3c45faf999 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -240,7 +240,7 @@ private void invalidateLocationCache(final String path) { PathLocation loc = entry.getValue(); String src = loc.getSourcePath(); if (src != null) { - if(isParentEntry(src, path)) { + if (isParentEntry(src, path)) { LOG.debug("Removing {}", src); it.remove(); } @@ -306,7 +306,7 @@ public void refreshEntries(final Collection entries) { existingEntry, entry); this.tree.put(srcPath, entry); invalidateLocationCache(srcPath); - LOG.info("Updated mount point {} in resolver"); + LOG.info("Updated mount point {} in resolver", srcPath); } } } @@ -515,7 +515,7 @@ private static PathLocation buildLocation( String nsId = oneDst.getNameserviceId(); String dest = oneDst.getDest(); String newPath = dest; - if (!newPath.endsWith(Path.SEPARATOR)) { + if (!newPath.endsWith(Path.SEPARATOR) && !remainingPath.isEmpty()) { newPath += Path.SEPARATOR; } newPath += remainingPath; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java new file mode 100644 index 0000000000..8dd73ecaff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; +import java.util.EnumMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.OrderedResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.RandomResolver; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Mount table resolver that supports multiple locations for each mount entry. + * The returned location contains prioritized remote paths from highest priority + * to the lowest priority. Multiple locations for a mount point are optional. + * When multiple locations are specified, both will be checked for the presence + * of a file and the nameservice for a new file/dir is chosen based on the + * results of a consistent hashing algorithm. + *

+ * Does the Mount table entry for this path have multiple destinations? + *

+ *

+ * It has multiple options to order the locations: HASH (default), LOCAL, + * RANDOM, and HASH_ALL. + *

+ * The consistent hashing result is dependent on the number and combination of + * nameservices that are registered for particular mount point. The order of + * nameservices/locations in the mount table is not prioritized. Each consistent + * hash calculation considers only the set of unique nameservices present for + * the mount table location. + */ +public class MultipleDestinationMountTableResolver extends MountTableResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(MultipleDestinationMountTableResolver.class); + + + /** Resolvers that use a particular order for multiple destinations. */ + private EnumMap orderedResolvers = + new EnumMap<>(DestinationOrder.class); + + + public MultipleDestinationMountTableResolver( + Configuration conf, Router router) { + super(conf, router); + + // Initialize the ordered resolvers + addResolver(DestinationOrder.HASH, new HashFirstResolver()); + addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router)); + addResolver(DestinationOrder.RANDOM, new RandomResolver()); + addResolver(DestinationOrder.HASH_ALL, new HashResolver()); + } + + @Override + public PathLocation getDestinationForPath(String path) throws IOException { + PathLocation mountTableResult = super.getDestinationForPath(path); + if (mountTableResult == null) { + LOG.error("The {} cannot find a location for {}", + super.getClass().getSimpleName(), path); + } else if (mountTableResult.hasMultipleDestinations()) { + DestinationOrder order = mountTableResult.getDestinationOrder(); + OrderedResolver orderedResolver = orderedResolvers.get(order); + if (orderedResolver == null) { + LOG.error("Cannot find resolver for order {}", order); + } else { + String firstNamespace = + orderedResolver.getFirstNamespace(path, mountTableResult); + + // Change the order of the name spaces according to the policy + if (firstNamespace != null) { + // This is the entity in the tree, we need to create our own copy + mountTableResult = new PathLocation(mountTableResult, firstNamespace); + LOG.debug("Ordered locations following {} are {}", + order, mountTableResult); + } else { + LOG.error("Cannot get main namespace for path {} with order {}", + path, order); + } + } + } + return mountTableResult; + } + + @VisibleForTesting + public void addResolver(DestinationOrder order, OrderedResolver resolver) { + orderedResolvers.put(order, resolver); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java index 4bccf1097a..03e68e5aaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java @@ -23,7 +23,8 @@ * this determines which location should be checked first. */ public enum DestinationOrder { - HASH, // Follow consistent hashing + HASH, // Follow consistent hashing in the first folder level LOCAL, // Local first - RANDOM // Random order + RANDOM, // Random order + HASH_ALL // Follow consistent hashing } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java new file mode 100644 index 0000000000..831b0820e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; + +/** + * Variation of HashResolver that only uses the first level of the path. + */ +public class HashFirstResolver extends HashResolver { + + @Override + public String getFirstNamespace(final String path, final PathLocation loc) { + String srcPath = loc.getSourcePath(); + String trimmedPath = trimPathToChild(path, srcPath); + LOG.debug("Only using the first part of the path: {} -> {}", + path, trimmedPath); + return super.getFirstNamespace(trimmedPath, loc); + } + + /** + * Hash only up to the immediate child of the mount point. This prevents the + * need to create/maintain subtrees under each multi-destination mount point. + * Each child of a multi-destination mount is mapped to only one hash + * location. + *

+ * Trims a path to at most the immediate child of a parent path. For example: + *

+ * + * @param path The path to trim. + * @param parent The parent used to find the immediate child. + * @return Trimmed path. + */ + private static String trimPathToChild(String path, String parent) { + // Path is invalid or equal to the parent + if (path.length() <= parent.length()) { + return parent; + } + String remainder = path.substring(parent.length()); + String[] components = + remainder.replaceFirst("^/", "").split(Path.SEPARATOR); + if (components.length > 0 && components[0].length() > 0) { + if (parent.endsWith(Path.SEPARATOR)) { + return parent + components[0]; + } else { + return parent + Path.SEPARATOR + components[0]; + } + } else { + return parent; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java new file mode 100644 index 0000000000..4034a46a65 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.utils.ConsistentHashRing; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Order the destinations based on consistent hashing. + */ +public class HashResolver implements OrderedResolver { + + protected static final Logger LOG = + LoggerFactory.getLogger(HashResolver.class); + + + /** Namespace set hash -> Locator. */ + private final Map hashResolverMap; + + /** Patterns for temporary files. */ + private static final String HEX_PATTERN = "\\p{XDigit}"; + private static final String UUID_PATTERN = HEX_PATTERN + "{8}-" + + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + + HEX_PATTERN + "{12}"; + private static final String ATTEMPT_PATTERN = + "attempt_\\d+_\\d{4}_._\\d{6}_\\d{2}"; + private static final String[] TEMP_FILE_PATTERNS = { + "(.+)\\.COPYING$", + "(.+)\\._COPYING_.*$", + "(.+)\\.tmp$", + "_temp/(.+)$", + "_temporary/(.+)\\." + UUID_PATTERN + "$", + "(.*)_temporary/\\d/_temporary/" + ATTEMPT_PATTERN + "/(.+)$" }; + /** Pattern for temporary files (or of the individual patterns). */ + private static final Pattern TEMP_FILE_PATTERN = + Pattern.compile(StringUtils.join("|", TEMP_FILE_PATTERNS)); + + + public HashResolver() { + this.hashResolverMap = new ConcurrentHashMap<>(); + } + + /** + * Use the result from consistent hashing locator to prioritize the locations + * for a path. + * + * @param path Path to check. + * @param loc Federated location with multiple destinations. + * @return First namespace based on hash. + */ + @Override + public String getFirstNamespace(final String path, final PathLocation loc) { + String finalPath = extractTempFileName(path); + Set namespaces = loc.getNamespaces(); + ConsistentHashRing locator = getHashResolver(namespaces); + String hashedSubcluster = locator.getLocation(finalPath); + if (hashedSubcluster == null) { + String srcPath = loc.getSourcePath(); + LOG.error("Cannot find subcluster for {} ({} -> {})", + srcPath, path, finalPath); + } + LOG.debug("Namespace for {} ({}) is {}", path, finalPath, hashedSubcluster); + return hashedSubcluster; + } + + /** + * Get the cached (if available) or generate a new hash resolver for this + * particular set of unique namespace identifiers. + * + * @param namespaces A set of unique namespace identifiers. + * @return A hash resolver configured to consistently resolve paths to + * namespaces using the provided set of namespace identifiers. + */ + private ConsistentHashRing getHashResolver(final Set namespaces) { + int hash = namespaces.hashCode(); + ConsistentHashRing resolver = this.hashResolverMap.get(hash); + if (resolver == null) { + resolver = new ConsistentHashRing(namespaces); + this.hashResolverMap.put(hash, resolver); + } + return resolver; + } + + /** + * Some files use a temporary naming pattern. Extract the final name from the + * temporary name. For example, files *._COPYING_ will be renamed, so we + * remove that chunk. + * + * @param input Input string. + * @return Final file name. + */ + @VisibleForTesting + public static String extractTempFileName(final String input) { + StringBuilder sb = new StringBuilder(); + Matcher matcher = TEMP_FILE_PATTERN.matcher(input); + if (matcher.find()) { + for (int i=1; i <= matcher.groupCount(); i++) { + String match = matcher.group(i); + if (match != null) { + sb.append(match); + } + } + } + if (sb.length() > 0) { + String ret = sb.toString(); + LOG.debug("Extracted {} from {}", ret, input); + return ret; + } + return input; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java new file mode 100644 index 0000000000..3508eab81f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The local subcluster (where the writer is) should be tried first. The writer + * is defined from the RPC query received in the RPC server. + */ +public class LocalResolver implements OrderedResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(LocalResolver.class); + + /** Configuration key to set the minimum time to update the local cache.*/ + public static final String MIN_UPDATE_PERIOD_KEY = + DFSConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period"; + /** 10 seconds by default. */ + private static final long MIN_UPDATE_PERIOD_DEFAULT = + TimeUnit.SECONDS.toMillis(10); + + + /** Router service. */ + private final Router router; + /** Minimum update time. */ + private final long minUpdateTime; + + /** Node IP -> Subcluster. */ + private Map nodeSubcluster = null; + /** Last time the subcluster map was updated. */ + private long lastUpdated; + + + public LocalResolver(final Configuration conf, final Router routerService) { + this.minUpdateTime = conf.getTimeDuration( + MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT, + TimeUnit.MILLISECONDS); + this.router = routerService; + } + + /** + * Get the local name space. This relies on the RPC Server to get the address + * from the client. + * + * TODO we only support DN and NN locations, we need to add others like + * Resource Managers. + * + * @param path Path ignored by this policy. + * @param loc Federated location with multiple destinations. + * @return Local name space. Null if we don't know about this machine. + */ + @Override + public String getFirstNamespace(final String path, final PathLocation loc) { + String localSubcluster = null; + String clientAddr = getClientAddr(); + Map nodeToSubcluster = getSubclusterMappings(); + if (nodeToSubcluster != null) { + localSubcluster = nodeToSubcluster.get(clientAddr); + if (localSubcluster != null) { + LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster); + } else { + LOG.error("Cannot get local namespace for {}", clientAddr); + } + } else { + LOG.error("Cannot get node mapping when resolving {} at {} from {}", + path, loc, clientAddr); + } + return localSubcluster; + } + + @VisibleForTesting + String getClientAddr() { + return Server.getRemoteAddress(); + } + + /** + * Get the mapping from nodes to subcluster. It gets this mapping from the + * subclusters through expensive calls (e.g., RPC) and uses caching to avoid + * too many calls. The cache might be updated asynchronously to reduce + * latency. + * + * @return Node IP -> Subcluster. + */ + @VisibleForTesting + synchronized Map getSubclusterMappings() { + if (nodeSubcluster == null || + (monotonicNow() - lastUpdated) > minUpdateTime) { + // Fetch the mapping asynchronously + Thread updater = new Thread(new Runnable() { + @Override + public void run() { + Map mapping = new HashMap<>(); + + Map dnSubcluster = getDatanodesSubcluster(); + if (dnSubcluster != null) { + mapping.putAll(dnSubcluster); + } + + Map nnSubcluster = getNamenodesSubcluster(); + if (nnSubcluster != null) { + mapping.putAll(nnSubcluster); + } + nodeSubcluster = mapping; + lastUpdated = monotonicNow(); + } + }); + updater.start(); + + // Wait until initialized + if (nodeSubcluster == null) { + try { + LOG.debug("Wait to get the mapping for the first time"); + updater.join(); + } catch (InterruptedException e) { + LOG.error("Cannot wait for the updater to finish"); + } + } + } + return nodeSubcluster; + } + + /** + * Get the Datanode mapping from the subclusters from the Namenodes. This + * needs to be done as a privileged action to use the user for the Router and + * not the one from the client in the RPC call. + * + * @return DN IP -> Subcluster. + */ + private Map getDatanodesSubcluster() { + + final RouterRpcServer rpcServer = getRpcServer(); + if (rpcServer == null) { + LOG.error("Cannot access the Router RPC server"); + return null; + } + + Map ret = new HashMap<>(); + try { + // We need to get the DNs as a privileged user + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + Map dnMap = loginUser.doAs( + new PrivilegedAction>() { + @Override + public Map run() { + try { + return rpcServer.getDatanodeStorageReportMap( + DatanodeReportType.ALL); + } catch (IOException e) { + LOG.error("Cannot get the datanodes from the RPC server", e); + return null; + } + } + }); + for (Entry entry : dnMap.entrySet()) { + String nsId = entry.getKey(); + DatanodeStorageReport[] dns = entry.getValue(); + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + String ipAddr = dnInfo.getIpAddr(); + ret.put(ipAddr, nsId); + } + } + } catch (IOException e) { + LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage()); + } + return ret; + } + + /** + * Get the Namenode mapping from the subclusters from the Membership store. As + * the Routers are usually co-located with Namenodes, we also check for the + * local address for this Router here. + * + * @return NN IP -> Subcluster. + */ + private Map getNamenodesSubcluster() { + + final MembershipStore membershipStore = getMembershipStore(); + if (membershipStore == null) { + LOG.error("Cannot access the Membership store"); + return null; + } + + // Manage requests from this hostname (127.0.0.1) + String localIp = "127.0.0.1"; + String localHostname = localIp; + try { + localHostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Cannot get local host name"); + } + + Map ret = new HashMap<>(); + try { + // Get the values from the store + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + final List nns = response.getNamenodeMemberships(); + for (MembershipState nn : nns) { + try { + String nsId = nn.getNameserviceId(); + String rpcAddress = nn.getRpcAddress(); + String hostname = HostAndPort.fromString(rpcAddress).getHostText(); + ret.put(hostname, nsId); + if (hostname.equals(localHostname)) { + ret.put(localIp, nsId); + } + + InetAddress addr = InetAddress.getByName(hostname); + String ipAddr = addr.getHostAddress(); + ret.put(ipAddr, nsId); + } catch (Exception e) { + LOG.error("Cannot get address for {}: {}", nn, e.getMessage()); + } + } + } catch (IOException ioe) { + LOG.error("Cannot get Namenodes from the State Store: {}", + ioe.getMessage()); + } + return ret; + } + + /** + * Get the Router RPC server. + * + * @return Router RPC server. Null if not possible. + */ + private RouterRpcServer getRpcServer() { + if (this.router == null) { + return null; + } + return router.getRpcServer(); + } + + /** + * Get the Membership store. + * + * @return Membership store. + */ + private MembershipStore getMembershipStore() { + StateStoreService stateStore = router.getStateStore(); + if (stateStore == null) { + return null; + } + return stateStore.getRegisteredRecordStore(MembershipStore.class); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java new file mode 100644 index 0000000000..3a3ccf762f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; + + +/** + * Policy that decides which should be the first location accessed given + * multiple destinations. + */ +public interface OrderedResolver { + + /** + * Get the first namespace based on this resolver approach. + * + * @param path Path to check. + * @param loc Federated location with multiple destinations. + * @return First namespace out of the locations. + */ + String getFirstNamespace(String path, PathLocation loc); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java new file mode 100644 index 0000000000..022aa48ef1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Order the destinations randomly. + */ +public class RandomResolver implements OrderedResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(RandomResolver.class); + + + /** Random number generator. */ + private static final Random RANDOM = new Random(); + + /** + * Get a random name space from the path. + * + * @param path Path ignored by this policy. + * @param loc Federated location with multiple destinations. + * @return Random name space. + */ + public String getFirstNamespace(final String path, final PathLocation loc) { + if (loc == null) { + return null; + } + Set namespaces = loc.getNamespaces(); + if (namespaces == null || namespaces.isEmpty()) { + LOG.error("Cannot get namespaces for {}", loc); + return null; + } + List nssList = new ArrayList<>(namespaces); + int index = RANDOM.nextInt(nssList.size()); + return nssList.get(index); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index e0dfeb47b0..d282a7da92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1204,31 +1204,56 @@ public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { checkOperation(OperationCategory.UNCHECKED); - Map datanodesMap = new HashMap<>(); - RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", - new Class[] {DatanodeReportType.class}, type); - Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent( - nss, method, true, false, DatanodeStorageReport[].class); - for (DatanodeStorageReport[] result : results.values()) { - for (DatanodeStorageReport node : result) { - String nodeId = node.getDatanodeInfo().getXferAddr(); + Map dnSubcluster = + getDatanodeStorageReportMap(type); + + // Avoid repeating machines in multiple subclusters + Map datanodesMap = new LinkedHashMap<>(); + for (DatanodeStorageReport[] dns : dnSubcluster.values()) { + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + String nodeId = dnInfo.getXferAddr(); if (!datanodesMap.containsKey(nodeId)) { - datanodesMap.put(nodeId, node); + datanodesMap.put(nodeId, dn); } // TODO merge somehow, right now it just takes the first one } } Collection datanodes = datanodesMap.values(); - // TODO sort somehow DatanodeStorageReport[] combinedData = new DatanodeStorageReport[datanodes.size()]; combinedData = datanodes.toArray(combinedData); return combinedData; } + /** + * Get the list of datanodes per subcluster. + * + * @param type Type of the datanodes to get. + * @return nsId -> datanode list. + * @throws IOException + */ + public Map getDatanodeStorageReportMap( + DatanodeReportType type) throws IOException { + + Map ret = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", + new Class[] {DatanodeReportType.class}, type); + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent( + nss, method, true, false, DatanodeStorageReport[].class); + for (Entry entry : + results.entrySet()) { + FederationNamespaceInfo ns = entry.getKey(); + String nsId = ns.getNameserviceId(); + DatanodeStorageReport[] result = entry.getValue(); + ret.put(nsId, result); + } + return ret; + } + @Override // ClientProtocol public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java index 3beeca320d..9667489165 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java @@ -295,6 +295,8 @@ private DestinationOrder convert(DestOrder order) { return DestinationOrder.LOCAL; case RANDOM: return DestinationOrder.RANDOM; + case HASH_ALL: + return DestinationOrder.HASH_ALL; default: return DestinationOrder.HASH; } @@ -306,6 +308,8 @@ private DestOrder convert(DestinationOrder order) { return DestOrder.LOCAL; case RANDOM: return DestOrder.RANDOM; + case HASH_ALL: + return DestOrder.HASH_ALL; default: return DestOrder.HASH; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java new file mode 100644 index 0000000000..89273dbcee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.io.MD5Hash; + +/** + * Consistent hash ring to distribute items across nodes (locations). If we add + * or remove nodes, it minimizes the item migration. + */ +public class ConsistentHashRing { + private static final String SEPERATOR = "/"; + private static final String VIRTUAL_NODE_FORMAT = "%s" + SEPERATOR + "%d"; + + /** Hash ring. */ + private SortedMap ring = new TreeMap(); + /** Entry -> num virtual nodes on ring. */ + private Map entryToVirtualNodes = + new HashMap(); + + /** Synchronization. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + public ConsistentHashRing(Set locations) { + for (String location : locations) { + addLocation(location); + } + } + + /** + * Add entry to consistent hash ring. + * + * @param location Node to add to the ring. + */ + public void addLocation(String location) { + addLocation(location, 100); + } + + /** + * Add entry to consistent hash ring. + * + * @param location Node to add to the ring. + * @param numVirtualNodes Number of virtual nodes to add. + */ + public void addLocation(String location, int numVirtualNodes) { + writeLock.lock(); + try { + entryToVirtualNodes.put(location, numVirtualNodes); + for (int i = 0; i < numVirtualNodes; i++) { + String key = String.format(VIRTUAL_NODE_FORMAT, location, i); + String hash = getHash(key); + ring.put(hash, key); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Remove specified entry from hash ring. + * + * @param location Node to remove from the ring. + */ + public void removeLocation(String location) { + writeLock.lock(); + try { + Integer numVirtualNodes = entryToVirtualNodes.remove(location); + for (int i = 0; i < numVirtualNodes; i++) { + String key = String.format(VIRTUAL_NODE_FORMAT, location, i); + String hash = getHash(key); + ring.remove(hash); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Return location (owner) of specified item. Owner is the next + * entry on the hash ring (with a hash value > hash value of item). + * @param item Item to look for. + * @return The location of the item. + */ + public String getLocation(String item) { + readLock.lock(); + try { + if (ring.isEmpty()) { + return null; + } + String hash = getHash(item); + if (!ring.containsKey(hash)) { + SortedMap tailMap = ring.tailMap(hash); + hash = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey(); + } + String virtualNode = ring.get(hash); + int index = virtualNode.lastIndexOf(SEPERATOR); + if (index >= 0) { + return virtualNode.substring(0, index); + } else { + return virtualNode; + } + } finally { + readLock.unlock(); + } + } + + public String getHash(String key) { + return MD5Hash.digest(key).toString(); + } + + /** + * Get the locations in the ring. + * @return Set of locations in the ring. + */ + public Set getLocations() { + return entryToVirtualNodes.keySet(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java new file mode 100644 index 0000000000..71496753bf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Assorted utility classes and helpers for HDFS Federation. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.utils; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index 8e9c7af01b..9dfd1b7ccf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -88,7 +88,8 @@ public RouterAdmin(Configuration conf) { public void printUsage() { String usage = "Federation Admin Tools:\n" + "\t[-add " - + "[-readonly] -owner -group -mode ]\n" + + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] " + + "-owner -group -mode ]\n" + "\t[-rm ]\n" + "\t[-ls ]\n" + "\t[-setQuota -nsQuota -ssQuota " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto index 2b285db6cf..b0d69821da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto @@ -129,6 +129,7 @@ message MountTableRecordProto { HASH = 0; LOCAL = 1; RANDOM = 2; + HASH_ALL = 3; } optional DestOrder destOrder = 6 [default = HASH]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java index f530fe99c4..e38443ed75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java @@ -497,9 +497,9 @@ public void testLocationCache() throws Exception { map2); entries.add(entry2); mountTable.refreshEntries(entries); - assertEquals("1->/testlocationcache/", + assertEquals("1->/testlocationcache", mountTable.getDestinationForPath("/testlocationcache").toString()); - assertEquals("2->/anothertestlocationcache/", + assertEquals("2->/anothertestlocationcache", mountTable.getDestinationForPath("/anothertestlocationcache") .toString()); @@ -518,7 +518,7 @@ public void testLocationCache() throws Exception { mountTable.refreshEntries(entries); // Ensure location cache update correctly - assertEquals("3->/testlocationcache/", + assertEquals("3->/testlocationcache", mountTable.getDestinationForPath("/testlocationcache").toString()); // Cleanup before exit diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java new file mode 100644 index 0000000000..3915c564bf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver.extractTempFileName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the multiple destination resolver. + */ +public class TestMultipleDestinationResolver { + + private MultipleDestinationMountTableResolver resolver; + + @Before + public void setup() throws IOException { + Configuration conf = new Configuration(); + resolver = new MultipleDestinationMountTableResolver(conf, null); + + // We manually point /tmp to only subcluster0 + Map map1 = new HashMap<>(); + map1.put("subcluster0", "/tmp"); + resolver.addEntry(MountTable.newInstance("/tmp", map1)); + + // We manually point / to subcluster0,1,2 with default order (hash) + Map mapDefault = new HashMap<>(); + mapDefault.put("subcluster0", "/"); + mapDefault.put("subcluster1", "/"); + mapDefault.put("subcluster2", "/"); + MountTable defaultEntry = MountTable.newInstance("/", mapDefault); + resolver.addEntry(defaultEntry); + + // We manually point /hash to subcluster0,1,2 with hashing + Map mapHash = new HashMap<>(); + mapHash.put("subcluster0", "/hash"); + mapHash.put("subcluster1", "/hash"); + mapHash.put("subcluster2", "/hash"); + MountTable hashEntry = MountTable.newInstance("/hash", mapHash); + hashEntry.setDestOrder(DestinationOrder.HASH); + resolver.addEntry(hashEntry); + + // We manually point /hashall to subcluster0,1,2 with hashing (full tree) + Map mapHashAll = new HashMap<>(); + mapHashAll.put("subcluster0", "/hashall"); + mapHashAll.put("subcluster1", "/hashall"); + mapHashAll.put("subcluster2", "/hashall"); + MountTable hashEntryAll = MountTable.newInstance("/hashall", mapHashAll); + hashEntryAll.setDestOrder(DestinationOrder.HASH_ALL); + resolver.addEntry(hashEntryAll); + + // We point /local to subclusters 0, 1, 2 with the local order + Map mapLocal = new HashMap<>(); + mapLocal.put("subcluster0", "/local"); + mapLocal.put("subcluster1", "/local"); + mapLocal.put("subcluster2", "/local"); + MountTable localEntry = MountTable.newInstance("/local", mapLocal); + localEntry.setDestOrder(DestinationOrder.LOCAL); + resolver.addEntry(localEntry); + + // We point /random to subclusters 0, 1, 2 with the random order + Map mapRandom = new HashMap<>(); + mapRandom.put("subcluster0", "/random"); + mapRandom.put("subcluster1", "/random"); + mapRandom.put("subcluster2", "/random"); + MountTable randomEntry = MountTable.newInstance("/random", mapRandom); + randomEntry.setDestOrder(DestinationOrder.RANDOM); + resolver.addEntry(randomEntry); + + // Read only mount point + Map mapReadOnly = new HashMap<>(); + mapReadOnly.put("subcluster0", "/readonly"); + mapReadOnly.put("subcluster1", "/readonly"); + mapReadOnly.put("subcluster2", "/readonly"); + MountTable readOnlyEntry = MountTable.newInstance("/readonly", mapReadOnly); + readOnlyEntry.setReadOnly(true); + resolver.addEntry(readOnlyEntry); + } + + @Test + public void testHashEqualDistribution() throws IOException { + // First level + testEvenDistribution("/hash"); + testEvenDistribution("/hash/folder0", false); + + // All levels + testEvenDistribution("/hashall"); + testEvenDistribution("/hashall/folder0"); + } + + @Test + public void testHashAll() throws IOException { + // Files should be spread across subclusters + PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt"); + assertDest("subcluster0", dest0); + PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt"); + assertDest("subcluster1", dest1); + + // Files within folder should be spread across subclusters + PathLocation dest2 = resolver.getDestinationForPath("/hashall/folder0"); + assertDest("subcluster2", dest2); + PathLocation dest3 = resolver.getDestinationForPath( + "/hashall/folder0/file0.txt"); + assertDest("subcluster1", dest3); + PathLocation dest4 = resolver.getDestinationForPath( + "/hashall/folder0/file1.txt"); + assertDest("subcluster0", dest4); + + PathLocation dest5 = resolver.getDestinationForPath( + "/hashall/folder0/folder0/file0.txt"); + assertDest("subcluster1", dest5); + PathLocation dest6 = resolver.getDestinationForPath( + "/hashall/folder0/folder0/file1.txt"); + assertDest("subcluster1", dest6); + PathLocation dest7 = resolver.getDestinationForPath( + "/hashall/folder0/folder0/file2.txt"); + assertDest("subcluster0", dest7); + + PathLocation dest8 = resolver.getDestinationForPath("/hashall/folder1"); + assertDest("subcluster1", dest8); + PathLocation dest9 = resolver.getDestinationForPath( + "/hashall/folder1/file0.txt"); + assertDest("subcluster0", dest9); + PathLocation dest10 = resolver.getDestinationForPath( + "/hashall/folder1/file1.txt"); + assertDest("subcluster1", dest10); + + PathLocation dest11 = resolver.getDestinationForPath("/hashall/folder2"); + assertDest("subcluster2", dest11); + PathLocation dest12 = resolver.getDestinationForPath( + "/hashall/folder2/file0.txt"); + assertDest("subcluster0", dest12); + PathLocation dest13 = resolver.getDestinationForPath( + "/hashall/folder2/file1.txt"); + assertDest("subcluster0", dest13); + PathLocation dest14 = resolver.getDestinationForPath( + "/hashall/folder2/file2.txt"); + assertDest("subcluster1", dest14); + } + + @Test + public void testHashFirst() throws IOException { + PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt"); + assertDest("subcluster0", dest0); + PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt"); + assertDest("subcluster1", dest1); + + // All these must be in the same location: subcluster0 + PathLocation dest2 = resolver.getDestinationForPath("/hash/folder0"); + assertDest("subcluster0", dest2); + PathLocation dest3 = resolver.getDestinationForPath( + "/hash/folder0/file0.txt"); + assertDest("subcluster0", dest3); + PathLocation dest4 = resolver.getDestinationForPath( + "/hash/folder0/file1.txt"); + assertDest("subcluster0", dest4); + + PathLocation dest5 = resolver.getDestinationForPath( + "/hash/folder0/folder0/file0.txt"); + assertDest("subcluster0", dest5); + PathLocation dest6 = resolver.getDestinationForPath( + "/hash/folder0/folder0/file1.txt"); + assertDest("subcluster0", dest6); + + // All these must be in the same location: subcluster2 + PathLocation dest7 = resolver.getDestinationForPath("/hash/folder1"); + assertDest("subcluster2", dest7); + PathLocation dest8 = resolver.getDestinationForPath( + "/hash/folder1/file0.txt"); + assertDest("subcluster2", dest8); + PathLocation dest9 = resolver.getDestinationForPath( + "/hash/folder1/file1.txt"); + assertDest("subcluster2", dest9); + + // All these must be in the same location: subcluster2 + PathLocation dest10 = resolver.getDestinationForPath("/hash/folder2"); + assertDest("subcluster2", dest10); + PathLocation dest11 = resolver.getDestinationForPath( + "/hash/folder2/file0.txt"); + assertDest("subcluster2", dest11); + PathLocation dest12 = resolver.getDestinationForPath( + "/hash/folder2/file1.txt"); + assertDest("subcluster2", dest12); + } + + @Test + public void testRandomEqualDistribution() throws IOException { + testEvenDistribution("/random"); + } + + @Test + public void testSingleDestination() throws IOException { + // All the files in /tmp should be in subcluster0 + for (int f = 0; f < 100; f++) { + String filename = "/tmp/b/c/file" + f + ".txt"; + PathLocation destination = resolver.getDestinationForPath(filename); + RemoteLocation loc = destination.getDefaultLocation(); + assertEquals("subcluster0", loc.getNameserviceId()); + assertEquals(filename, loc.getDest()); + } + } + + @Test + public void testResolveSubdirectories() throws Exception { + // Simulate a testdir under a multi-destination mount. + Random r = new Random(); + String testDir = "/sort/testdir" + r.nextInt(); + String file1 = testDir + "/file1" + r.nextInt(); + String file2 = testDir + "/file2" + r.nextInt(); + + // Verify both files resolve to the same namespace as the parent dir. + PathLocation testDirLocation = resolver.getDestinationForPath(testDir); + RemoteLocation defaultLoc = testDirLocation.getDefaultLocation(); + String testDirNamespace = defaultLoc.getNameserviceId(); + + PathLocation file1Location = resolver.getDestinationForPath(file1); + RemoteLocation defaultLoc1 = file1Location.getDefaultLocation(); + assertEquals(testDirNamespace, defaultLoc1.getNameserviceId()); + + PathLocation file2Location = resolver.getDestinationForPath(file2); + RemoteLocation defaultLoc2 = file2Location.getDefaultLocation(); + assertEquals(testDirNamespace, defaultLoc2.getNameserviceId()); + } + + @Test + public void testExtractTempFileName() { + for (String teststring : new String[] { + "testfile1.txt.COPYING", + "testfile1.txt._COPYING_", + "testfile1.txt._COPYING_.attempt_1486662804109_0055_m_000042_0", + "testfile1.txt.tmp", + "_temp/testfile1.txt", + "_temporary/testfile1.txt.af77e2ab-4bc5-4959-ae08-299c880ee6b8", + "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" + + "testfile1.txt" }) { + String finalName = extractTempFileName(teststring); + assertEquals("testfile1.txt", finalName); + } + + // False cases + assertEquals( + "file1.txt.COPYING1", extractTempFileName("file1.txt.COPYING1")); + assertEquals("file1.txt.tmp2", extractTempFileName("file1.txt.tmp2")); + + // Speculation patterns + String finalName = extractTempFileName( + "_temporary/part-00007.af77e2ab-4bc5-4959-ae08-299c880ee6b8"); + assertEquals("part-00007", finalName); + finalName = extractTempFileName( + "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" + + "part-00003"); + assertEquals("part-00003", finalName); + + // Subfolders + finalName = extractTempFileName("folder0/testfile1.txt._COPYING_"); + assertEquals("folder0/testfile1.txt", finalName); + finalName = extractTempFileName( + "folder0/folder1/testfile1.txt._COPYING_"); + assertEquals("folder0/folder1/testfile1.txt", finalName); + finalName = extractTempFileName( + "processedHrsData.txt/_temporary/0/_temporary/" + + "attempt_201706281636_0007_m_000003_46/part-00003"); + assertEquals("processedHrsData.txt/part-00003", finalName); + } + + @Test + public void testReadOnly() throws IOException { + MountTable mount = resolver.getMountPoint("/readonly"); + assertTrue(mount.isReadOnly()); + + PathLocation dest0 = resolver.getDestinationForPath("/readonly/file0.txt"); + assertDest("subcluster1", dest0); + PathLocation dest1 = resolver.getDestinationForPath("/readonly/file1.txt"); + assertDest("subcluster2", dest1); + + // All these must be in the same location: subcluster0 + PathLocation dest2 = resolver.getDestinationForPath("/readonly/folder0"); + assertDest("subcluster1", dest2); + PathLocation dest3 = resolver.getDestinationForPath( + "/readonly/folder0/file0.txt"); + assertDest("subcluster1", dest3); + PathLocation dest4 = resolver.getDestinationForPath( + "/readonly/folder0/file1.txt"); + assertDest("subcluster1", dest4); + + PathLocation dest5 = resolver.getDestinationForPath( + "/readonly/folder0/folder0/file0.txt"); + assertDest("subcluster1", dest5); + PathLocation dest6 = resolver.getDestinationForPath( + "/readonly/folder0/folder0/file1.txt"); + assertDest("subcluster1", dest6); + + // All these must be in the same location: subcluster2 + PathLocation dest7 = resolver.getDestinationForPath("/readonly/folder1"); + assertDest("subcluster2", dest7); + PathLocation dest8 = resolver.getDestinationForPath( + "/readonly/folder1/file0.txt"); + assertDest("subcluster2", dest8); + PathLocation dest9 = resolver.getDestinationForPath( + "/readonly/folder1/file1.txt"); + assertDest("subcluster2", dest9); + + // All these must be in the same location: subcluster2 + PathLocation dest10 = resolver.getDestinationForPath("/readonly/folder2"); + assertDest("subcluster1", dest10); + PathLocation dest11 = resolver.getDestinationForPath( + "/readonly/folder2/file0.txt"); + assertDest("subcluster1", dest11); + PathLocation dest12 = resolver.getDestinationForPath( + "/readonly/folder2/file1.txt"); + assertDest("subcluster1", dest12); + } + + @Test + public void testLocalResolver() throws IOException { + PathLocation dest0 = + resolver.getDestinationForPath("/local/folder0/file0.txt"); + assertDest("subcluster0", dest0); + } + + @Test + public void testRandomResolver() throws IOException { + Set destinations = new HashSet<>(); + for (int i = 0; i < 30; i++) { + PathLocation dest = + resolver.getDestinationForPath("/random/folder0/file0.txt"); + RemoteLocation firstDest = dest.getDestinations().get(0); + String nsId = firstDest.getNameserviceId(); + destinations.add(nsId); + } + assertEquals(3, destinations.size()); + } + + /** + * Test that a path has files distributed across destinations evenly. + * @param path Path to check. + * @throws IOException + */ + private void testEvenDistribution(final String path) throws IOException { + testEvenDistribution(path, true); + } + + /** + * Test that a path has files distributed across destinations evenly or not. + * @param path Path to check. + * @param even If the distribution should be even or not. + * @throws IOException If it cannot check it. + */ + private void testEvenDistribution(final String path, final boolean even) + throws IOException { + + // Subcluster -> Files + Map> results = new HashMap<>(); + for (int f = 0; f < 10000; f++) { + String filename = path + "/file" + f + ".txt"; + PathLocation destination = resolver.getDestinationForPath(filename); + RemoteLocation loc = destination.getDefaultLocation(); + assertEquals(filename, loc.getDest()); + + String nsId = loc.getNameserviceId(); + if (!results.containsKey(nsId)) { + results.put(nsId, new TreeSet<>()); + } + results.get(nsId).add(filename); + } + + if (!even) { + // All files should be in one subcluster + assertEquals(1, results.size()); + } else { + // Files should be distributed somewhat evenly + assertEquals(3, results.size()); + int count = 0; + for (Set files : results.values()) { + count = count + files.size(); + } + int avg = count / results.keySet().size(); + for (Set files : results.values()) { + int filesCount = files.size(); + // Check that the count in each namespace is within 20% of avg + assertTrue(filesCount > 0); + assertTrue(Math.abs(filesCount - avg) < (avg / 5)); + } + } + } + + private static void assertDest(String expectedDest, PathLocation loc) { + assertEquals(expectedDest, loc.getDestinations().get(0).getNameserviceId()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java new file mode 100644 index 0000000000..42ede62936 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test the {@link LocalResolver}. + */ +public class TestLocalResolver { + + @Test + @SuppressWarnings("unchecked") + public void testLocalResolver() throws IOException { + + // Mock the subcluster mapping + Configuration conf = new Configuration(); + Router router = mock(Router.class); + StateStoreService stateStore = mock(StateStoreService.class); + MembershipStore membership = mock(MembershipStore.class); + when(router.getStateStore()).thenReturn(stateStore); + when(stateStore.getRegisteredRecordStore(any(Class.class))) + .thenReturn(membership); + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(); + // Set the mapping for each client + List records = new LinkedList<>(); + records.add(newMembershipState("client0", "subcluster0")); + records.add(newMembershipState("client1", "subcluster1")); + records.add(newMembershipState("client2", "subcluster2")); + response.setNamenodeMemberships(records); + when(membership.getNamenodeRegistrations( + any(GetNamenodeRegistrationsRequest.class))).thenReturn(response); + + // Mock the client resolution: it will be anything in sb + StringBuilder sb = new StringBuilder("clientX"); + LocalResolver localResolver = new LocalResolver(conf, router); + LocalResolver spyLocalResolver = spy(localResolver); + doAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return sb.toString(); + } + }).when(spyLocalResolver).getClientAddr(); + + // Add the mocks to the resolver + MultipleDestinationMountTableResolver resolver = + new MultipleDestinationMountTableResolver(conf, router); + resolver.addResolver(DestinationOrder.LOCAL, spyLocalResolver); + + + // We point /local to subclusters 0, 1, 2 with the local order + Map mapLocal = new HashMap<>(); + mapLocal.put("subcluster0", "/local"); + mapLocal.put("subcluster1", "/local"); + mapLocal.put("subcluster2", "/local"); + MountTable localEntry = MountTable.newInstance("/local", mapLocal); + localEntry.setDestOrder(DestinationOrder.LOCAL); + resolver.addEntry(localEntry); + + // Test first with the default destination + PathLocation dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster0", dest); + + // We change the client location and verify + setClient(sb, "client2"); + dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster2", dest); + + setClient(sb, "client1"); + dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster1", dest); + + setClient(sb, "client0"); + dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster0", dest); + } + + private void assertDestination(String expectedNsId, PathLocation loc) { + List dests = loc.getDestinations(); + RemoteLocation dest = dests.get(0); + assertEquals(expectedNsId, dest.getNameserviceId()); + } + + private MembershipState newMembershipState(String addr, String nsId) { + return MembershipState.newInstance( + "routerId", nsId, "nn0", "cluster0", "blockPool0", + addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004", + FederationNamenodeServiceState.ACTIVE, false); + } + + /** + * Set the address of the client issuing the request. We use a StringBuilder + * to modify the value in place for the mock. + * @param sb StringBuilder to set the client string. + * @param client Address of the client. + */ + private static void setClient(StringBuilder sb, String client) { + sb.replace(0, sb.length(), client); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java index 3271d56c43..a8ffded3dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.util.Time; @@ -172,6 +174,52 @@ public void testAddReadOnlyMountTable() throws IOException { MountTable record = getMountTableEntry("/readonly"); assertEquals("/readonly", record.getSourcePath()); assertTrue(record.isReadOnly()); + + // Removing the new entry + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance("/readonly"); + RemoveMountTableEntryResponse removeResponse = + mountTable.removeMountTableEntry(removeRequest); + assertTrue(removeResponse.getStatus()); + } + + @Test + public void testAddOrderMountTable() throws IOException { + testAddOrderMountTable(DestinationOrder.HASH); + testAddOrderMountTable(DestinationOrder.LOCAL); + testAddOrderMountTable(DestinationOrder.RANDOM); + testAddOrderMountTable(DestinationOrder.HASH_ALL); + } + + private void testAddOrderMountTable(final DestinationOrder order) + throws IOException { + final String mnt = "/" + order; + MountTable newEntry = MountTable.newInstance( + mnt, Collections.singletonMap("ns0", "/testdir"), + Time.now(), Time.now()); + newEntry.setDestOrder(order); + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Add + AddMountTableEntryRequest addRequest; + AddMountTableEntryResponse addResponse; + addRequest = AddMountTableEntryRequest.newInstance(newEntry); + addResponse = mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + + // Check that we have the read only entry + MountTable record = getMountTableEntry(mnt); + assertEquals(mnt, record.getSourcePath()); + assertEquals(order, record.getDestOrder()); + + // Removing the new entry + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance(mnt); + RemoveMountTableEntryResponse removeResponse = + mountTable.removeMountTableEntry(removeRequest); + assertTrue(removeResponse.getStatus()); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index 161e613acf..20353c3c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; + import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.List; @@ -32,7 +33,9 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; @@ -48,6 +51,7 @@ import org.junit.Test; import com.google.common.base.Supplier; + /** * Tests Router admin commands. */ @@ -141,6 +145,36 @@ public void testAddMountTable() throws Exception { assertTrue(mountTable.isReadOnly()); } + @Test + public void testAddOrderMountTable() throws Exception { + testAddOrderMountTable(DestinationOrder.HASH); + testAddOrderMountTable(DestinationOrder.LOCAL); + testAddOrderMountTable(DestinationOrder.RANDOM); + testAddOrderMountTable(DestinationOrder.HASH_ALL); + } + + private void testAddOrderMountTable(DestinationOrder order) + throws Exception { + final String mnt = "/" + order; + final String nsId = "ns0,ns1"; + final String dest = "/"; + String[] argv = new String[] { + "-add", mnt, nsId, dest, "-order", order.toString()}; + assertEquals(0, ToolRunner.run(admin, argv)); + + // Check the state in the State Store + stateStore.loadCache(MountTableStoreImpl.class, true); + MountTableManager mountTable = client.getMountTableManager(); + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(mnt); + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + List entries = response.getEntries(); + assertEquals(1, entries.size()); + assertEquals(2, entries.get(0).getDestinations().size()); + assertEquals(order, response.getEntries().get(0).getDestOrder()); + } + @Test public void testListMountTable() throws Exception { String nsId = "ns0";