From 20a4ec351c51da3459423852abea1d6c0e3097e3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Thu, 23 May 2019 10:09:07 -0700 Subject: [PATCH] HDDS-700. Support rack awared node placement policy based on network topology. Contributed by Sammi Chen. --- .../hadoop/hdds/protocol/DatanodeDetails.java | 37 +- .../scm/container/ReplicationManager.java | 2 +- .../algorithms/ContainerPlacementPolicy.java | 3 +- .../placement/algorithms/SCMCommonPolicy.java | 10 +- .../SCMContainerPlacementCapacity.java | 9 +- .../SCMContainerPlacementRackAware.java | 329 ++++++++++++++++++ .../SCMContainerPlacementRandom.java | 8 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 27 +- .../scm/container/TestReplicationManager.java | 2 +- .../TestSCMContainerPlacementCapacity.java | 4 +- .../TestSCMContainerPlacementRackAware.java | 257 ++++++++++++++ .../TestSCMContainerPlacementRandom.java | 4 +- .../placement/TestContainerPlacement.java | 4 +- 13 files changed, 662 insertions(+), 34 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 1dfeecd47a..be6f44cd41 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.net.NetConstants; +import org.apache.hadoop.hdds.scm.net.NodeImpl; import java.util.ArrayList; import java.util.List; @@ -35,9 +37,9 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class DatanodeDetails implements Comparable { - - /** +public class DatanodeDetails extends NodeImpl implements + Comparable { +/** * DataNode's unique identifier in the cluster. */ private final UUID uuid; @@ -47,18 +49,19 @@ public class DatanodeDetails implements Comparable { private List ports; private String certSerialId; - /** * Constructs DatanodeDetails instance. DatanodeDetails.Builder is used * for instantiating DatanodeDetails. * @param uuid DataNode's UUID * @param ipAddress IP Address of this DataNode * @param hostName DataNode's hostname + * @param networkLocation DataNode's network location path * @param ports Ports used by the DataNode * @param certSerialId serial id from SCM issued certificate. */ private DatanodeDetails(String uuid, String ipAddress, String hostName, - List ports, String certSerialId) { + String networkLocation, List ports, String certSerialId) { + super(hostName, networkLocation, NetConstants.NODE_COST_DEFAULT); this.uuid = UUID.fromString(uuid); this.ipAddress = ipAddress; this.hostName = hostName; @@ -67,6 +70,8 @@ private DatanodeDetails(String uuid, String ipAddress, String hostName, } protected DatanodeDetails(DatanodeDetails datanodeDetails) { + super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(), + datanodeDetails.getCost()); this.uuid = datanodeDetails.uuid; this.ipAddress = datanodeDetails.ipAddress; this.hostName = datanodeDetails.hostName; @@ -223,6 +228,8 @@ public String toString() { ipAddress + ", host: " + hostName + + ", networkLocation: " + + getNetworkLocation() + ", certSerialId: " + certSerialId + "}"; } @@ -259,6 +266,7 @@ public static final class Builder { private String id; private String ipAddress; private String hostName; + private String networkLocation; private List ports; private String certSerialId; @@ -303,6 +311,17 @@ public Builder setHostName(String host) { return this; } + /** + * Sets the network location of DataNode. + * + * @param loc location + * @return DatanodeDetails.Builder + */ + public Builder setNetworkLocation(String loc) { + this.networkLocation = loc; + return this; + } + /** * Adds a DataNode Port. * @@ -334,9 +353,12 @@ public Builder setCertSerialId(String certId) { */ public DatanodeDetails build() { Preconditions.checkNotNull(id); - return new DatanodeDetails(id, ipAddress, hostName, ports, certSerialId); + if (networkLocation == null) { + networkLocation = NetConstants.DEFAULT_RACK; + } + return new DatanodeDetails(id, ipAddress, hostName, networkLocation, + ports, certSerialId); } - } /** @@ -437,5 +459,4 @@ public String getCertSerialId() { public void setCertSerialId(String certSerialId) { this.certSerialId = certSerialId; } - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index e247e96cd9..a911e5a832 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -484,7 +484,7 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, .getReplicationFactor().getNumber(); final int delta = replicationFactor - getReplicaCount(id, replicas); final List selectedDatanodes = containerPlacement - .chooseDatanodes(source, delta, container.getUsedBytes()); + .chooseDatanodes(source, null, delta, container.getUsedBytes()); LOG.info("Container {} is under replicated. Expected replica count" + " is {}, but found {}.", id, replicationFactor, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java index 3336c8e80e..52ce7964b6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java @@ -33,12 +33,13 @@ public interface ContainerPlacementPolicy { * that satisfy the nodes and size requirement. * * @param excludedNodes - list of nodes to be excluded. + * @param favoredNodes - list of nodes preferred. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return list of datanodes chosen. * @throws IOException */ List chooseDatanodes(List excludedNodes, - int nodesRequired, long sizeRequired) + List favoredNodes, int nodesRequired, long sizeRequired) throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java index 9fc47eaf74..c3e3024fb7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -97,6 +97,7 @@ public Configuration getConf() { * * * @param excludedNodes - datanodes with existing replicas + * @param favoredNodes - list of nodes preferred. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return list of datanodes chosen. @@ -104,7 +105,7 @@ public Configuration getConf() { */ @Override public List chooseDatanodes( - List excludedNodes, + List excludedNodes, List favoredNodes, int nodesRequired, final long sizeRequired) throws SCMException { List healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); @@ -137,7 +138,6 @@ public List chooseDatanodes( throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE); } - return healthyList; } @@ -147,8 +147,8 @@ public List chooseDatanodes( * @param datanodeDetails DatanodeDetails * @return true if we have enough space. */ - private boolean hasEnoughSpace(DatanodeDetails datanodeDetails, - long sizeRequired) { + boolean hasEnoughSpace(DatanodeDetails datanodeDetails, + long sizeRequired) { SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails); return (nodeMetric != null) && (nodeMetric.get() != null) && nodeMetric.get().getRemaining().hasResources(sizeRequired); @@ -196,6 +196,4 @@ public List getResultSet( */ public abstract DatanodeDetails chooseNode( List healthyNodes); - - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java index 8df8f6e034..daf8222606 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java @@ -86,6 +86,7 @@ public SCMContainerPlacementCapacity(final NodeManager nodeManager, * * * @param excludedNodes - list of the datanodes to exclude. + * @param favoredNodes - list of nodes preferred. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return List of datanodes. @@ -93,10 +94,10 @@ public SCMContainerPlacementCapacity(final NodeManager nodeManager, */ @Override public List chooseDatanodes( - List excludedNodes, final int nodesRequired, - final long sizeRequired) throws SCMException { - List healthyNodes = - super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); + List excludedNodes, List favoredNodes, + final int nodesRequired, final long sizeRequired) throws SCMException { + List healthyNodes = super.chooseDatanodes(excludedNodes, + favoredNodes, nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java new file mode 100644 index 0000000000..3758b85888 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetConstants; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.Node; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Container placement policy that choose datanodes with network topology + * awareness, together with the space to satisfy the size constraints. + *

+ * This placement policy complies with the algorithm used in HDFS. With default + * 3 replica, two replica will be on the same rack, the third one will on a + * different rack. + *

+ * This implementation applies to network topology like "/rack/node". Don't + * recommend to use this if the network topology has more layers. + *

+ */ +public final class SCMContainerPlacementRackAware extends SCMCommonPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMContainerPlacementRackAware.class); + private final NetworkTopology networkTopology; + private boolean fallback; + private int RACK_LEVEL = 1; + private int MAX_RETRY= 3; + + /** + * Constructs a Container Placement with rack awareness. + * + * @param nodeManager Node Manager + * @param conf Configuration + * @param fallback Whether reducing constrains to choose a data node when + * there is no node which satisfy all constrains. + * Basically, false for open container placement, and true + * for closed container placement. + */ + public SCMContainerPlacementRackAware(final NodeManager nodeManager, + final Configuration conf, final NetworkTopology networkTopology, + final boolean fallback) { + super(nodeManager, conf); + this.networkTopology = networkTopology; + this.fallback = fallback; + } + + /** + * Called by SCM to choose datanodes. + * There are two scenarios, one is choosing all nodes for a new pipeline. + * Another is choosing node to meet replication requirement. + * + * + * @param excludedNodes - list of the datanodes to exclude. + * @param favoredNodes - list of nodes preferred. This is a hint to the + * allocator, whether the favored nodes will be used + * depends on whether the nodes meets the allocator's + * requirement. + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return List of datanodes. + * @throws SCMException SCMException + */ + @Override + public List chooseDatanodes( + List excludedNodes, List favoredNodes, + int nodesRequired, final long sizeRequired) throws SCMException { + Preconditions.checkArgument(nodesRequired > 0); + + int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT); + int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size(); + if (datanodeCount < nodesRequired + excludedNodesCount) { + throw new SCMException("No enough datanodes to choose.", null); + } + List mutableFavoredNodes = favoredNodes; + // sanity check of favoredNodes + if (mutableFavoredNodes != null && excludedNodes != null) { + mutableFavoredNodes = new ArrayList<>(); + mutableFavoredNodes.addAll(favoredNodes); + mutableFavoredNodes.removeAll(excludedNodes); + } + int favoredNodeNum = mutableFavoredNodes == null? 0 : + mutableFavoredNodes.size(); + + List chosenNodes = new ArrayList<>(); + int favorIndex = 0; + if (excludedNodes == null || excludedNodes.isEmpty()) { + // choose all nodes for a new pipeline case + // choose first datanode from scope ROOT or from favoredNodes if not null + Node favoredNode = favoredNodeNum > favorIndex ? + mutableFavoredNodes.get(favorIndex) : null; + Node firstNode; + if (favoredNode != null) { + firstNode = favoredNode; + favorIndex++; + } else { + firstNode = chooseNode(null, null, sizeRequired); + } + chosenNodes.add(firstNode); + nodesRequired--; + if (nodesRequired == 0) { + return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0])); + } + + // choose second datanode on the same rack as first one + favoredNode = favoredNodeNum > favorIndex ? + mutableFavoredNodes.get(favorIndex) : null; + Node secondNode; + if (favoredNode != null && + networkTopology.isSameParent(firstNode, favoredNode)) { + secondNode = favoredNode; + favorIndex++; + } else { + secondNode = chooseNode(chosenNodes, firstNode, sizeRequired); + } + chosenNodes.add(secondNode); + nodesRequired--; + if (nodesRequired == 0) { + return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0])); + } + + // choose remaining datanodes on different rack as first and second + return chooseNodes(null, chosenNodes, mutableFavoredNodes, favorIndex, + nodesRequired, sizeRequired); + } else { + List mutableExcludedNodes = new ArrayList<>(); + mutableExcludedNodes.addAll(excludedNodes); + // choose node to meet replication requirement + // case 1: one excluded node, choose one on the same rack as the excluded + // node, choose others on different racks. + Node favoredNode; + if (excludedNodes.size() == 1) { + favoredNode = favoredNodeNum > favorIndex ? + mutableFavoredNodes.get(favorIndex) : null; + Node firstNode; + if (favoredNode != null && + networkTopology.isSameParent(excludedNodes.get(0), favoredNode)) { + firstNode = favoredNode; + favorIndex++; + } else { + firstNode = chooseNode(mutableExcludedNodes, excludedNodes.get(0), + sizeRequired); + } + chosenNodes.add(firstNode); + nodesRequired--; + if (nodesRequired == 0) { + return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0])); + } + // choose remaining nodes on different racks + return chooseNodes(null, chosenNodes, mutableFavoredNodes, favorIndex, + nodesRequired, sizeRequired); + } + // case 2: two or more excluded nodes, if these two nodes are + // in the same rack, then choose nodes on different racks, otherwise, + // choose one on the same rack as one of excluded nodes, remaining chosen + // are on different racks. + for(int i = 0; i < excludedNodesCount; i++) { + for (int j = i + 1; j < excludedNodesCount; j++) { + if (networkTopology.isSameParent( + excludedNodes.get(i), excludedNodes.get(j))) { + // choose remaining nodes on different racks + return chooseNodes(mutableExcludedNodes, chosenNodes, + mutableFavoredNodes, favorIndex, nodesRequired, sizeRequired); + } + } + } + // choose one data on the same rack with one excluded node + favoredNode = favoredNodeNum > favorIndex ? + mutableFavoredNodes.get(favorIndex) : null; + Node secondNode; + if (favoredNode != null && networkTopology.isSameParent( + mutableExcludedNodes.get(0), favoredNode)) { + secondNode = favoredNode; + favorIndex++; + } else { + secondNode = + chooseNode(chosenNodes, mutableExcludedNodes.get(0), sizeRequired); + } + chosenNodes.add(secondNode); + mutableExcludedNodes.add(secondNode); + nodesRequired--; + if (nodesRequired == 0) { + return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0])); + } + // choose remaining nodes on different racks + return chooseNodes(mutableExcludedNodes, chosenNodes, mutableFavoredNodes, + favorIndex, nodesRequired, sizeRequired); + } + } + + @Override + public DatanodeDetails chooseNode(List healthyNodes) { + return null; + } + + /** + * Choose a datanode which meets the requirements. If there is no node which + * meets all the requirements, there is fallback chosen process depending on + * whether fallback is allowed when this class is instantiated. + * + * + * @param excludedNodes - list of the datanodes to excluded. Can be null. + * @param affinityNode - the chosen nodes should be on the same rack as + * affinityNode. Can be null. + * @param sizeRequired - size required for the container or block. + * @return List of chosen datanodes. + * @throws SCMException SCMException + */ + private Node chooseNode(List excludedNodes, Node affinityNode, + long sizeRequired) throws SCMException { + int ancestorGen = RACK_LEVEL; + int maxRetry = MAX_RETRY; + while(true) { + Node node = networkTopology.chooseRandom(NetConstants.ROOT, null, + excludedNodes, affinityNode, ancestorGen); + if (node == null) { + // cannot find the node which meets all constrains + LOG.warn("Failed to find the datanode. excludedNodes:" + + (excludedNodes == null ? "" : excludedNodes.toString()) + + ", affinityNode:" + + (affinityNode == null ? "" : affinityNode.getNetworkFullPath())); + if (fallback) { + // fallback, don't consider the affinity node + if (affinityNode != null) { + affinityNode = null; + continue; + } + // fallback, don't consider cross rack + if (ancestorGen == RACK_LEVEL) { + ancestorGen--; + continue; + } + } + // there is no constrains to reduce or fallback is true + throw new SCMException("No satisfied datanode to meet the " + + " excludedNodes and affinityNode constrains.", null); + } + if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) { + LOG.debug("Datanode {} is chosen. Required size is {}", + node.toString(), sizeRequired); + return node; + } else { + maxRetry--; + if (maxRetry == 0) { + // avoid the infinite loop + String errMsg = "No satisfied datanode to meet the space constrains. " + + " sizeRequired: " + sizeRequired; + LOG.info(errMsg); + throw new SCMException(errMsg, null); + } + } + } + } + + /** + * Choose a batch of datanodes on different rack than excludedNodes or + * chosenNodes. + * + * + * @param excludedNodes - list of the datanodes to excluded. Can be null. + * @param chosenNodes - list of nodes already chosen. These nodes should also + * be excluded. Cannot be null. + * @param favoredNodes - list of favoredNodes. It's a hint. Whether the nodes + * are chosen depends on whether they meet the constrains. + * Can be null. + * @param favorIndex - the node index of favoredNodes which is not chosen yet. + * @param sizeRequired - size required for the container or block. + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return List of chosen datanodes. + * @throws SCMException SCMException + */ + private List chooseNodes(List excludedNodes, + List chosenNodes, List favoredNodes, + int favorIndex, int nodesRequired, long sizeRequired) + throws SCMException { + Preconditions.checkArgument(chosenNodes != null); + List excludedNodeList = excludedNodes != null ? + excludedNodes : chosenNodes; + int favoredNodeNum = favoredNodes == null? 0 : favoredNodes.size(); + while(true) { + Node favoredNode = favoredNodeNum > favorIndex ? + favoredNodes.get(favorIndex) : null; + Node chosenNode; + if (favoredNode != null && networkTopology.isSameParent( + excludedNodeList.get(excludedNodeList.size() - 1), favoredNode)) { + chosenNode = favoredNode; + favorIndex++; + } else { + chosenNode = chooseNode(excludedNodeList, null, sizeRequired); + } + excludedNodeList.add(chosenNode); + if (excludedNodeList != chosenNodes) { + chosenNodes.add(chosenNode); + } + nodesRequired--; + if (nodesRequired == 0) { + return Arrays.asList(chosenNodes.toArray(new DatanodeDetails[0])); + } + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java index a70f633e82..48b6139442 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java @@ -58,6 +58,7 @@ public SCMContainerPlacementRandom(final NodeManager nodeManager, * * * @param excludedNodes - list of the datanodes to exclude. + * @param favoredNodes - list of nodes preferred. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return List of Datanodes. @@ -65,10 +66,11 @@ public SCMContainerPlacementRandom(final NodeManager nodeManager, */ @Override public List chooseDatanodes( - List excludedNodes, final int nodesRequired, - final long sizeRequired) throws SCMException { + List excludedNodes, List favoredNodes, + final int nodesRequired, final long sizeRequired) throws SCMException { List healthyNodes = - super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); + super.chooseDatanodes(excludedNodes, favoredNodes, nodesRequired, + sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index d61924a91e..b1dd77e554 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -96,6 +96,22 @@ public static DatanodeDetails randomDatanodeDetails() { return createDatanodeDetails(UUID.randomUUID()); } + /** + * Creates DatanodeDetails with random UUID, specific hostname and network + * location. + * + * @return DatanodeDetails + */ + public static DatanodeDetails createDatanodeDetails(String hostname, + String loc) { + String ipAddress = random.nextInt(256) + + "." + random.nextInt(256) + + "." + random.nextInt(256) + + "." + random.nextInt(256); + return createDatanodeDetails(UUID.randomUUID().toString(), hostname, + ipAddress, loc); + } + /** * Creates DatanodeDetails using the given UUID. * @@ -108,7 +124,8 @@ public static DatanodeDetails createDatanodeDetails(UUID uuid) { + "." + random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256); - return createDatanodeDetails(uuid.toString(), "localhost", ipAddress); + return createDatanodeDetails(uuid.toString(), "localhost", ipAddress, + null); } /** @@ -121,7 +138,8 @@ public static DatanodeDetails createDatanodeDetails(UUID uuid) { public static DatanodeDetails getDatanodeDetails( RegisteredCommand registeredCommand) { return createDatanodeDetails(registeredCommand.getDatanodeUUID(), - registeredCommand.getHostName(), registeredCommand.getIpAddress()); + registeredCommand.getHostName(), registeredCommand.getIpAddress(), + null); } /** @@ -134,7 +152,7 @@ public static DatanodeDetails getDatanodeDetails( * @return DatanodeDetails */ private static DatanodeDetails createDatanodeDetails(String uuid, - String hostname, String ipAddress) { + String hostname, String ipAddress, String networkLocation) { DatanodeDetails.Port containerPort = DatanodeDetails.newPort( DatanodeDetails.Port.Name.STANDALONE, 0); DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( @@ -147,7 +165,8 @@ private static DatanodeDetails createDatanodeDetails(String uuid, .setIpAddress(ipAddress) .addPort(containerPort) .addPort(ratisPort) - .addPort(restPort); + .addPort(restPort) + .setNetworkLocation(networkLocation); return builder.build(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 5da057e10e..bc921e3ce5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -96,7 +96,7 @@ public void setup() throws IOException, InterruptedException { containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class); Mockito.when(containerPlacementPolicy.chooseDatanodes( - Mockito.anyListOf(DatanodeDetails.class), + Mockito.anyListOf(DatanodeDetails.class), null, Mockito.anyInt(), Mockito.anyLong())) .thenAnswer(invocation -> { int count = (int) invocation.getArguments()[1]; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java index f406016a46..fb2a4c33df 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -78,8 +78,8 @@ public void chooseDatanodes() throws SCMException { for (int i = 0; i < 1000; i++) { //when - List datanodeDetails = - scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15); + List datanodeDetails = scmContainerPlacementRandom + .chooseDatanodes(existingNodes, null, 1, 15); //then Assert.assertEquals(1, datanodeDetails.size()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java new file mode 100644 index 0000000000..d80c7e5316 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.net.NodeSchema; +import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.when; + +/** + * Test for the scm container rack aware placement. + */ +public class TestSCMContainerPlacementRackAware { + private NetworkTopology cluster; + private List datanodes = new ArrayList<>(); + // policy with fallback capability + private SCMContainerPlacementRackAware policy; + // policy prohibit fallback + private SCMContainerPlacementRackAware policyNoFallback; + // node storage capacity + private final long STORAGE_CAPACITY = 100L; + + @Before + public void setup() { + //initialize network topology instance + Configuration conf = new OzoneConfiguration(); + NodeSchema[] schemas = new NodeSchema[] + {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; + NodeSchemaManager.getInstance().init(schemas, true); + cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance()); + + // build datanodes, and network topology + String rack = "/rack"; + String hostname = "node"; + for (int i = 0; i < 15; i++) { + // Totally 3 racks, each has 5 datanodes + DatanodeDetails node = TestUtils.createDatanodeDetails( + hostname + i, rack + (i / 5)); + datanodes.add(node); + cluster.add(node); + } + + // create mock node manager + NodeManager nodeManager = Mockito.mock(NodeManager.class); + when(nodeManager.getNodes(NodeState.HEALTHY)) + .thenReturn(new ArrayList<>(datanodes)); + when(nodeManager.getNodeStat(anyObject())) + .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 0L, 100L)); + when(nodeManager.getNodeStat(datanodes.get(2))) + .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 90L, 10L)); + when(nodeManager.getNodeStat(datanodes.get(3))) + .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 80L, 20L)); + when(nodeManager.getNodeStat(datanodes.get(4))) + .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 70L, 30L)); + + // create placement policy instances + policy = + new SCMContainerPlacementRackAware(nodeManager, conf, cluster, true); + policyNoFallback = + new SCMContainerPlacementRackAware(nodeManager, conf, cluster, false); + } + + + @Test + public void chooseNodeWithNoExcludedNodes() throws SCMException { + // test choose new datanodes for new pipeline cases + // 1 replica + int nodeNum = 1; + List datanodeDetails = + policy.chooseDatanodes(null, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + + // 2 replicas + nodeNum = 2; + datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(1))); + + // 3 replicas + nodeNum = 3; + datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(1))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(2))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), + datanodeDetails.get(2))); + + // 4 replicas + nodeNum = 4; + datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(1))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(2))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), + datanodeDetails.get(2))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(3))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2), + datanodeDetails.get(3))); + } + + @Test + public void chooseNodeWithExcludedNodes() throws SCMException { + // test choose new datanodes for under replicated pipeline + // 3 replicas, two existing datanodes on same rack + int nodeNum = 1; + List excludedNodes = new ArrayList<>(); + + excludedNodes.add(datanodes.get(0)); + excludedNodes.add(datanodes.get(1)); + List datanodeDetails = policy.chooseDatanodes( + excludedNodes, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + excludedNodes.get(0))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + excludedNodes.get(1))); + + // 3 replicas, two existing datanodes on different rack + excludedNodes.clear(); + excludedNodes.add(datanodes.get(0)); + excludedNodes.add(datanodes.get(7)); + datanodeDetails = policy.chooseDatanodes( + excludedNodes, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(cluster.isSameParent( + datanodeDetails.get(0), excludedNodes.get(0)) || + cluster.isSameParent(datanodeDetails.get(0), excludedNodes.get(1))); + + // 3 replicas, one existing datanode + nodeNum = 2; + excludedNodes.clear(); + excludedNodes.add(datanodes.get(0)); + datanodeDetails = policy.chooseDatanodes( + excludedNodes, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(cluster.isSameParent( + datanodeDetails.get(0), excludedNodes.get(0)) || + cluster.isSameParent(datanodeDetails.get(0), excludedNodes.get(1))); + } + + @Test + public void testFallback() throws SCMException { + + // 5 replicas. there are only 3 racks. policy with fallback should + // allocate the 5th datanode though it will break the rack rule(first + // 2 replicas on same rack, others are different racks). + int nodeNum = 5; + List datanodeDetails = + policy.chooseDatanodes(null, null, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(1))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(2))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), + datanodeDetails.get(2))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), + datanodeDetails.get(3))); + Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2), + datanodeDetails.get(3))); + } + + + @Test(expected = SCMException.class) + public void testNoFallback() throws SCMException { + // 5 replicas. there are only 3 racks. policy prohibit fallback should fail. + int nodeNum = 5; + policyNoFallback.chooseDatanodes(null, null, nodeNum, 15); + } + + @Test + public void chooseNodeWithFavoredNodes() throws SCMException { + int nodeNum = 1; + List excludedNodes = new ArrayList<>(); + List favoredNodes = new ArrayList<>(); + + // no excludedNodes, only favoredNodes + favoredNodes.add(datanodes.get(0)); + List datanodeDetails = policy.chooseDatanodes( + excludedNodes, favoredNodes, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(datanodeDetails.get(0).getNetworkFullPath() + .equals(favoredNodes.get(0).getNetworkFullPath())); + + // no overlap between excludedNodes and favoredNodes, favoredNodes can been + // chosen. + excludedNodes.clear(); + favoredNodes.clear(); + excludedNodes.add(datanodes.get(0)); + favoredNodes.add(datanodes.get(2)); + datanodeDetails = policy.chooseDatanodes( + excludedNodes, favoredNodes, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertTrue(datanodeDetails.get(0).getNetworkFullPath() + .equals(favoredNodes.get(0).getNetworkFullPath())); + + // there is overlap between excludedNodes and favoredNodes, favoredNodes + // should not be chosen. + excludedNodes.clear(); + favoredNodes.clear(); + excludedNodes.add(datanodes.get(0)); + favoredNodes.add(datanodes.get(0)); + datanodeDetails = policy.chooseDatanodes( + excludedNodes, favoredNodes, nodeNum, 15); + Assert.assertEquals(nodeNum, datanodeDetails.size()); + Assert.assertFalse(datanodeDetails.get(0).getNetworkFullPath() + .equals(favoredNodes.get(0).getNetworkFullPath())); + } + + @Test(expected = SCMException.class) + public void testNoInfiniteLoop() throws SCMException { + int nodeNum = 1; + // request storage space larger than node capability + policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 15); + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java index d285a3f5ab..a20c6c019f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -67,8 +67,8 @@ public void chooseDatanodes() throws SCMException { for (int i = 0; i < 100; i++) { //when - List datanodeDetails = - scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15); + List datanodeDetails = scmContainerPlacementRandom + .chooseDatanodes(existingNodes, null, 1, 15); //then Assert.assertEquals(1, datanodeDetails.size()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java index 1c80880d01..bd62111cf1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -87,12 +87,12 @@ public void testCapacityPlacementYieldsBetterDataDistribution() throws for (int x = 0; x < opsCount; x++) { long containerSize = random.nextInt(100) * OzoneConsts.GB; List nodesCapacity = - capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, + capacityPlacer.chooseDatanodes(new ArrayList<>(), null, nodesRequired, containerSize); assertEquals(nodesRequired, nodesCapacity.size()); List nodesRandom = - randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, + randomPlacer.chooseDatanodes(nodesCapacity, null, nodesRequired, containerSize); // One fifth of all calls are delete