diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java index 8dd73ecaff..e31077e019 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java @@ -21,6 +21,7 @@ import java.util.EnumMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver; import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver; @@ -77,6 +78,8 @@ public MultipleDestinationMountTableResolver( addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router)); addResolver(DestinationOrder.RANDOM, new RandomResolver()); addResolver(DestinationOrder.HASH_ALL, new HashResolver()); + addResolver(DestinationOrder.SPACE, + new AvailableSpaceResolver(conf, router)); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java new file mode 100644 index 0000000000..77a35a47ed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Order the destinations based on available space. This resolver uses a + * higher probability (instead of "always") to choose the cluster with higher + * available space. + */ +public class AvailableSpaceResolver + extends RouterResolver { + + private static final Logger LOG = LoggerFactory + .getLogger(AvailableSpaceResolver.class); + + /** Increases chance of files on subcluster with more available space. */ + public static final String BALANCER_PREFERENCE_KEY = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + + "available-space-resolver.balanced-space-preference-fraction"; + public static final float BALANCER_PREFERENCE_DEFAULT = 0.6f; + + /** Random instance used in the subcluster comparison. */ + private static final Random RAND = new Random(); + + /** Customized comparator for SubclusterAvailableSpace. */ + private SubclusterSpaceComparator comparator; + + public AvailableSpaceResolver(final Configuration conf, + final Router routerService) { + super(conf, routerService); + float balancedPreference = conf.getFloat(BALANCER_PREFERENCE_KEY, + BALANCER_PREFERENCE_DEFAULT); + if (balancedPreference < 0.5) { + LOG.warn("The balancer preference value is less than 0.5. That means more" + + " files will be allocated in cluster with lower available space."); + } + + this.comparator = new SubclusterSpaceComparator(balancedPreference); + } + + /** + * Get the mapping from NamespaceId to subcluster space info. It gets this + * mapping from the subclusters through expensive calls (e.g., RPC) and uses + * caching to avoid too many calls. The cache might be updated asynchronously + * to reduce latency. + * + * @return NamespaceId -> {@link SubclusterAvailableSpace} + */ + @Override + protected Map getSubclusterInfo( + MembershipStore membershipStore) { + Map mapping = new HashMap<>(); + try { + // Get the Namenode's available space info from the subclusters + // from the Membership store. + GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest + .newInstance(); + GetNamenodeRegistrationsResponse response = membershipStore + .getNamenodeRegistrations(request); + final List nns = response.getNamenodeMemberships(); + for (MembershipState nn : nns) { + try { + String nsId = nn.getNameserviceId(); + long availableSpace = nn.getStats().getAvailableSpace(); + mapping.put(nsId, new SubclusterAvailableSpace(nsId, availableSpace)); + } catch (Exception e) { + LOG.error("Cannot get stats info for {}: {}.", nn, e.getMessage()); + } + } + } catch (IOException ioe) { + LOG.error("Cannot get Namenodes from the State Store.", ioe); + } + return mapping; + } + + @Override + protected String chooseFirstNamespace(String path, PathLocation loc) { + Map subclusterInfo = + getSubclusterMapping(); + List subclusterList = new LinkedList<>( + subclusterInfo.values()); + Collections.sort(subclusterList, comparator); + + return subclusterList.size() > 0 ? subclusterList.get(0).getNameserviceId() + : null; + } + + /** + * Inner class that stores cluster available space info. + */ + static class SubclusterAvailableSpace { + private final String nsId; + private final long availableSpace; + + SubclusterAvailableSpace(String nsId, long availableSpace) { + this.nsId = nsId; + this.availableSpace = availableSpace; + } + + public String getNameserviceId() { + return this.nsId; + } + + public long getAvailableSpace() { + return this.availableSpace; + } + } + + /** + * Customized comparator for SubclusterAvailableSpace. If more available + * space the one cluster has, the higher priority it will have. But this + * is not absolute, there is a balanced preference to make this use a higher + * probability (instead of "always") to compare by this way. + */ + static final class SubclusterSpaceComparator + implements Comparator, Serializable { + private int balancedPreference; + + SubclusterSpaceComparator(float balancedPreference) { + Preconditions.checkArgument( + balancedPreference <= 1 && balancedPreference >= 0, + "The balancer preference value should be in the range 0.0 - 1.0"); + + this.balancedPreference = (int) (100 * balancedPreference); + } + + @Override + public int compare(SubclusterAvailableSpace cluster1, + SubclusterAvailableSpace cluster2) { + int ret = cluster1.getAvailableSpace() > cluster2.getAvailableSpace() ? -1 + : 1; + + if (ret < 0) { + return (RAND.nextInt(100) < balancedPreference) ? -1 : 1; + } else { + return (RAND.nextInt(100) < balancedPreference) ? 1 : -1; + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java index 03e68e5aaa..99c5e22d12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java @@ -26,5 +26,6 @@ public enum DestinationOrder { HASH, // Follow consistent hashing in the first folder level LOCAL, // Local first RANDOM, // Random order - HASH_ALL // Follow consistent hashing + HASH_ALL, // Follow consistent hashing + SPACE // Available space based order } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java index 4d76c8990c..b6bd4b3b8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hdfs.server.federation.resolver.order; -import static org.apache.hadoop.util.Time.monotonicNow; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.net.HostAndPort; - import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -30,17 +25,14 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; -import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; @@ -50,40 +42,46 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; + /** * The local subcluster (where the writer is) should be tried first. The writer * is defined from the RPC query received in the RPC server. */ -public class LocalResolver implements OrderedResolver { +public class LocalResolver extends RouterResolver { private static final Logger LOG = LoggerFactory.getLogger(LocalResolver.class); - /** Configuration key to set the minimum time to update the local cache.*/ - public static final String MIN_UPDATE_PERIOD_KEY = - RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period"; - /** 10 seconds by default. */ - private static final long MIN_UPDATE_PERIOD_DEFAULT = - TimeUnit.SECONDS.toMillis(10); - - - /** Router service. */ - private final Router router; - /** Minimum update time. */ - private final long minUpdateTime; - - /** Node IP -> Subcluster. */ - private Map nodeSubcluster = null; - /** Last time the subcluster map was updated. */ - private long lastUpdated; - - public LocalResolver(final Configuration conf, final Router routerService) { - this.minUpdateTime = conf.getTimeDuration( - MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT, - TimeUnit.MILLISECONDS); - this.router = routerService; + super(conf, routerService); + } + + /** + * Get the mapping from nodes to subcluster. It gets this mapping from the + * subclusters through expensive calls (e.g., RPC) and uses caching to avoid + * too many calls. The cache might be updated asynchronously to reduce + * latency. + * + * @return Node IP -> Subcluster. + */ + @Override + protected Map getSubclusterInfo( + MembershipStore membershipStore) { + Map mapping = new HashMap<>(); + + Map dnSubcluster = getDatanodesSubcluster(); + if (dnSubcluster != null) { + mapping.putAll(dnSubcluster); + } + + Map nnSubcluster = getNamenodesSubcluster(membershipStore); + if (nnSubcluster != null) { + mapping.putAll(nnSubcluster); + } + return mapping; } /** @@ -98,12 +96,12 @@ public LocalResolver(final Configuration conf, final Router routerService) { * @return Local name space. Null if we don't know about this machine. */ @Override - public String getFirstNamespace(final String path, final PathLocation loc) { + protected String chooseFirstNamespace(String path, PathLocation loc) { String localSubcluster = null; String clientAddr = getClientAddr(); - Map nodeToSubcluster = getSubclusterMappings(); - if (nodeToSubcluster != null) { - localSubcluster = nodeToSubcluster.get(clientAddr); + Map subclusterInfo = getSubclusterMapping(); + if (subclusterInfo != null) { + localSubcluster = subclusterInfo.get(clientAddr); if (localSubcluster != null) { LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster); } else { @@ -121,52 +119,6 @@ String getClientAddr() { return Server.getRemoteAddress(); } - /** - * Get the mapping from nodes to subcluster. It gets this mapping from the - * subclusters through expensive calls (e.g., RPC) and uses caching to avoid - * too many calls. The cache might be updated asynchronously to reduce - * latency. - * - * @return Node IP -> Subcluster. - */ - @VisibleForTesting - synchronized Map getSubclusterMappings() { - if (nodeSubcluster == null || - (monotonicNow() - lastUpdated) > minUpdateTime) { - // Fetch the mapping asynchronously - Thread updater = new Thread(new Runnable() { - @Override - public void run() { - Map mapping = new HashMap<>(); - - Map dnSubcluster = getDatanodesSubcluster(); - if (dnSubcluster != null) { - mapping.putAll(dnSubcluster); - } - - Map nnSubcluster = getNamenodesSubcluster(); - if (nnSubcluster != null) { - mapping.putAll(nnSubcluster); - } - nodeSubcluster = mapping; - lastUpdated = monotonicNow(); - } - }); - updater.start(); - - // Wait until initialized - if (nodeSubcluster == null) { - try { - LOG.debug("Wait to get the mapping for the first time"); - updater.join(); - } catch (InterruptedException e) { - LOG.error("Cannot wait for the updater to finish"); - } - } - } - return nodeSubcluster; - } - /** * Get the Datanode mapping from the subclusters from the Namenodes. This * needs to be done as a privileged action to use the user for the Router and @@ -221,14 +173,8 @@ public Map run() { * * @return NN IP -> Subcluster. */ - private Map getNamenodesSubcluster() { - - final MembershipStore membershipStore = getMembershipStore(); - if (membershipStore == null) { - LOG.error("Cannot access the Membership store"); - return null; - } - + private Map getNamenodesSubcluster( + MembershipStore membershipStore) { // Manage requests from this hostname (127.0.0.1) String localIp = "127.0.0.1"; String localHostname = localIp; @@ -269,29 +215,4 @@ private Map getNamenodesSubcluster() { } return ret; } - - /** - * Get the Router RPC server. - * - * @return Router RPC server. Null if not possible. - */ - private RouterRpcServer getRpcServer() { - if (this.router == null) { - return null; - } - return router.getRpcServer(); - } - - /** - * Get the Membership store. - * - * @return Membership store. - */ - private MembershipStore getMembershipStore() { - StateStoreService stateStore = router.getStateStore(); - if (stateStore == null) { - return null; - } - return stateStore.getRegisteredRecordStore(MembershipStore.class); - } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java new file mode 100644 index 0000000000..91af1ca06a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The order resolver that depends upon the Router service. + * + * @param The key type of subcluster mapping info queried from Router. + * @param The value type of subcluster mapping info queried from Router. + */ +public abstract class RouterResolver implements OrderedResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterResolver.class); + + /** Configuration key to set the minimum time to update subcluster info. */ + public static final String MIN_UPDATE_PERIOD_KEY = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "router-resolver.update-period"; + /** 10 seconds by default. */ + private static final long MIN_UPDATE_PERIOD_DEFAULT = TimeUnit.SECONDS + .toMillis(10); + + /** Router service. */ + private final Router router; + /** Minimum update time. */ + private final long minUpdateTime; + + /** K -> T template mapping. */ + private Map subclusterMapping = null; + /** Last time the subcluster mapping was updated. */ + private long lastUpdated; + + public RouterResolver(final Configuration conf, final Router routerService) { + this.minUpdateTime = conf.getTimeDuration(MIN_UPDATE_PERIOD_KEY, + MIN_UPDATE_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + this.router = routerService; + } + + @Override + public String getFirstNamespace(String path, PathLocation loc) { + updateSubclusterMapping(); + return chooseFirstNamespace(path, loc); + } + + /** + * The implementation for getting desired subcluster mapping info. + * + * @param membershipStore Membership store the resolver queried from. + * @return The map of desired type info. + */ + protected abstract Map getSubclusterInfo( + MembershipStore membershipStore); + + /** + * Choose the first namespace from queried subcluster mapping info. + * + * @param path Path to check. + * @param loc Federated location with multiple destinations. + * @return First namespace out of the locations. + */ + protected abstract String chooseFirstNamespace(String path, PathLocation loc); + + /** + * Update mapping info periodically. + */ + private synchronized void updateSubclusterMapping() { + if (subclusterMapping == null + || (monotonicNow() - lastUpdated) > minUpdateTime) { + // Fetch the mapping asynchronously + Thread updater = new Thread(new Runnable() { + @Override + public void run() { + final MembershipStore membershipStore = getMembershipStore(); + if (membershipStore == null) { + LOG.error("Cannot access the Membership store."); + return; + } + + subclusterMapping = getSubclusterInfo(membershipStore); + lastUpdated = monotonicNow(); + } + }); + updater.start(); + + // Wait until initialized + if (subclusterMapping == null) { + try { + LOG.debug("Wait to get the mapping for the first time"); + updater.join(); + } catch (InterruptedException e) { + LOG.error("Cannot wait for the updater to finish"); + } + } + } + } + + /** + * Get the Router RPC server. + * + * @return Router RPC server. Null if not possible. + */ + protected RouterRpcServer getRpcServer() { + if (this.router == null) { + return null; + } + return router.getRpcServer(); + } + + /** + * Get the Membership store. + * + * @return Membership store. + */ + protected MembershipStore getMembershipStore() { + StateStoreService stateStore = router.getStateStore(); + if (stateStore == null) { + return null; + } + return stateStore.getRegisteredRecordStore(MembershipStore.class); + } + + /** + * Get subcluster mapping info. + * + * @return The map of subcluster info. + */ + protected Map getSubclusterMapping() { + return this.subclusterMapping; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index b3f677d80a..eaa39515fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -534,31 +534,20 @@ public HdfsFileStatus create(String src, FsPermission masked, } /** - * Get the permissions for the parent of a child with given permissions. If - * the child has r--, we will set it to r-x. + * Get the permissions for the parent of a child with given permissions. + * Add implicit u+wx permission for parent. This is based on + * @{FSDirMkdirOp#addImplicitUwx}. * @param mask The permission mask of the child. * @return The permission mask of the parent. */ private static FsPermission getParentPermission(final FsPermission mask) { FsPermission ret = new FsPermission( - applyExecute(mask.getUserAction()), - applyExecute(mask.getGroupAction()), - applyExecute(mask.getOtherAction())); + mask.getUserAction().or(FsAction.WRITE_EXECUTE), + mask.getGroupAction(), + mask.getOtherAction()); return ret; } - /** - * Apply the execute permissions if it can be read. - * @param action Input permission. - * @return Output permission. - */ - private static FsAction applyExecute(final FsAction action) { - if (action.and(FsAction.READ) == FsAction.READ) { - return action.or(FsAction.EXECUTE); - } - return action; - } - /** * Get the location to create a file. It checks if the file already existed * in one of the locations. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java index f8fec87c60..60496efe47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -421,7 +421,8 @@ public boolean equals(Object obj) { public boolean isAll() { DestinationOrder order = getDestOrder(); return order == DestinationOrder.HASH_ALL || - order == DestinationOrder.RANDOM; + order == DestinationOrder.RANDOM || + order == DestinationOrder.SPACE; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java index 9667489165..48f93bc2ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java @@ -297,6 +297,8 @@ private DestinationOrder convert(DestOrder order) { return DestinationOrder.RANDOM; case HASH_ALL: return DestinationOrder.HASH_ALL; + case SPACE: + return DestinationOrder.SPACE; default: return DestinationOrder.HASH; } @@ -310,6 +312,8 @@ private DestOrder convert(DestinationOrder order) { return DestOrder.RANDOM; case HASH_ALL: return DestOrder.HASH_ALL; + case SPACE: + return DestOrder.SPACE; default: return DestOrder.HASH; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto index b0d69821da..b6a0b129bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto @@ -130,6 +130,7 @@ message MountTableRecordProto { LOCAL = 1; RANDOM = 2; HASH_ALL = 3; + SPACE = 4; } optional DestOrder destOrder = 6 [default = HASH]; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java new file mode 100644 index 0000000000..dfbdf514b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace; +import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterSpaceComparator; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MembershipStatsPBImpl; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Test; + +/** + * Test the {@link AvailableSpaceResolver}. + */ +public class TestAvailableSpaceResolver { + + private static final int SUBCLUSTER_NUM = 10; + + @Test + public void testResolverWithNoPreference() throws IOException { + MultipleDestinationMountTableResolver mountTableResolver = + mockAvailableSpaceResolver(1.0f); + // Since we don't have any preference, it will + // always chose the maximum-available-space subcluster. + PathLocation loc = mountTableResolver.getDestinationForPath("/space"); + assertEquals("subcluster9", + loc.getDestinations().get(0).getNameserviceId()); + + loc = mountTableResolver.getDestinationForPath("/space/subdir"); + assertEquals("subcluster9", + loc.getDestinations().get(0).getNameserviceId()); + } + + @Test + public void testResolverWithDefaultPreference() throws IOException { + MultipleDestinationMountTableResolver mountTableResolver = + mockAvailableSpaceResolver(BALANCER_PREFERENCE_DEFAULT); + + int retries = 10; + int retryTimes = 0; + // There is chance we won't always chose the + // maximum-available-space subcluster. + for (retryTimes = 0; retryTimes < retries; retryTimes++) { + PathLocation loc = mountTableResolver.getDestinationForPath("/space"); + if (!"subcluster9" + .equals(loc.getDestinations().get(0).getNameserviceId())) { + break; + } + } + assertNotEquals(retries, retryTimes); + } + + /** + * Mock the available space based resolver. + * + * @param balancerPreference The balancer preference for the resolver. + * @throws IOException + * @return MultipleDestinationMountTableResolver instance. + */ + @SuppressWarnings("unchecked") + private MultipleDestinationMountTableResolver mockAvailableSpaceResolver( + float balancerPreference) throws IOException { + Configuration conf = new Configuration(); + conf.setFloat(BALANCER_PREFERENCE_KEY, balancerPreference); + Router router = mock(Router.class); + StateStoreService stateStore = mock(StateStoreService.class); + MembershipStore membership = mock(MembershipStore.class); + when(router.getStateStore()).thenReturn(stateStore); + when(stateStore.getRegisteredRecordStore(any(Class.class))) + .thenReturn(membership); + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(); + // Set the mapping for each client + List records = new LinkedList<>(); + for (int i = 0; i < SUBCLUSTER_NUM; i++) { + records.add(newMembershipState("subcluster" + i, i)); + } + response.setNamenodeMemberships(records); + + when(membership + .getNamenodeRegistrations(any(GetNamenodeRegistrationsRequest.class))) + .thenReturn(response); + + // construct available space resolver + AvailableSpaceResolver resolver = new AvailableSpaceResolver(conf, router); + MultipleDestinationMountTableResolver mountTableResolver = + new MultipleDestinationMountTableResolver(conf, router); + mountTableResolver.addResolver(DestinationOrder.SPACE, resolver); + + // We point /space to subclusters [0,..9] with the SPACE order + Map destinations = new HashMap<>(); + for (int i = 0; i < SUBCLUSTER_NUM; i++) { + destinations.put("subcluster" + i, "/space"); + } + MountTable spaceEntry = MountTable.newInstance("/space", destinations); + spaceEntry.setDestOrder(DestinationOrder.SPACE); + mountTableResolver.addEntry(spaceEntry); + + return mountTableResolver; + } + + public static MembershipState newMembershipState(String nameservice, + long availableSpace) { + MembershipState record = MembershipState.newInstance(); + record.setNameserviceId(nameservice); + + MembershipStats stats = new MembershipStatsPBImpl(); + stats.setAvailableSpace(availableSpace); + record.setStats(stats); + return record; + } + + @Test + public void testSubclusterSpaceComparator() { + verifyRank(0.0f, true, false); + verifyRank(1.0f, true, true); + verifyRank(0.5f, false, false); + verifyRank(BALANCER_PREFERENCE_DEFAULT, false, false); + + // test for illegal cases + try { + verifyRank(2.0f, false, false); + fail("Subcluster comparison should be failed."); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "The balancer preference value should be in the range 0.0 - 1.0", e); + } + + try { + verifyRank(-1.0f, false, false); + fail("Subcluster comparison should be failed."); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "The balancer preference value should be in the range 0.0 - 1.0", e); + } + } + + /** + * Verify result rank with {@link SubclusterSpaceComparator}. + * @param balancerPreference The balancer preference used + * in {@link SubclusterSpaceComparator}. + * @param shouldOrdered The result rank should be ordered. + * @param isDesc If the rank result is in a descending order. + */ + private void verifyRank(float balancerPreference, boolean shouldOrdered, + boolean isDesc) { + List subclusters = new LinkedList<>(); + for (int i = 0; i < SUBCLUSTER_NUM; i++) { + subclusters.add(new SubclusterAvailableSpace("subcluster" + i, i)); + } + + // shuffle the cluster list if we expect rank to be ordered + if (shouldOrdered) { + Collections.shuffle(subclusters); + } + + SubclusterSpaceComparator comparator = new SubclusterSpaceComparator( + balancerPreference); + Collections.sort(subclusters, comparator); + + int i = SUBCLUSTER_NUM - 1; + for (; i >= 0; i--) { + SubclusterAvailableSpace cluster = subclusters + .get(SUBCLUSTER_NUM - 1 - i); + + if (shouldOrdered) { + if (isDesc) { + assertEquals("subcluster" + i, cluster.getNameserviceId()); + assertEquals(i, cluster.getAvailableSpace()); + } else { + assertEquals("subcluster" + (SUBCLUSTER_NUM - 1 - i), + cluster.getNameserviceId()); + assertEquals(SUBCLUSTER_NUM - 1 - i, cluster.getAvailableSpace()); + } + } else { + // If catch one cluster is not in ordered, that's expected behavior. + if (!cluster.getNameserviceId().equals("subcluster" + i) + && cluster.getAvailableSpace() != i) { + break; + } + } + } + + // The var i won't reach to 0 since cluster list won't be completely + // ordered. + if (!shouldOrdered) { + assertNotEquals(0, i); + } + subclusters.clear(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java index 4995de4115..715b627f69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java @@ -60,8 +60,10 @@ public class TestRouterAllResolver { /** Directory that will be in a HASH_ALL mount point. */ private static final String TEST_DIR_HASH_ALL = "/hashall"; - /** Directory that will be in a HASH_ALL mount point. */ + /** Directory that will be in a RANDOM mount point. */ private static final String TEST_DIR_RANDOM = "/random"; + /** Directory that will be in a SPACE mount point. */ + private static final String TEST_DIR_SPACE = "/space"; /** Number of namespaces. */ private static final int NUM_NAMESPACES = 2; @@ -103,6 +105,7 @@ public void setup() throws Exception { // Setup the test mount point createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL); createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM); + createMountTableEntry(TEST_DIR_SPACE, DestinationOrder.SPACE); // Get filesystems for federated and each namespace routerFs = routerContext.getFileSystem(); @@ -135,6 +138,11 @@ public void testRandomAll() throws Exception { testAll(TEST_DIR_RANDOM); } + @Test + public void testSpaceAll() throws Exception { + testAll(TEST_DIR_SPACE); + } + /** * Tests that the resolver spreads files across subclusters in the whole * tree.