From 18e283576679da3b093b6d34a129818803382343 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein <50450311+amahussein@users.noreply.github.com> Date: Fri, 8 Jan 2021 13:10:09 -0600 Subject: [PATCH] HADOOP-17408. Optimize NetworkTopology sorting block locations. (#2601). Contributed by Ahmed Hussein and Daryn Sharp. (cherry picked from commit 77435a025e5ba2172dc0b5aaf2da9537c6a978ce) --- .../apache/hadoop/net/NetworkTopology.java | 77 ++++++++----------- .../hadoop/net/TestNetworkTopology.java | 3 +- 2 files changed, 36 insertions(+), 44 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index d37aebce79..e27423125d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -19,7 +19,6 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -29,6 +28,8 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -52,6 +53,8 @@ public class NetworkTopology { private static final char PATH_SEPARATOR = '/'; private static final String PATH_SEPARATOR_STR = "/"; private static final String ROOT = "/"; + private static final AtomicReference RANDOM_REF = + new AtomicReference<>(); public static class InvalidTopologyException extends RuntimeException { private static final long serialVersionUID = 1L; @@ -396,17 +399,12 @@ static public int getDistanceByPath(Node node1, Node node2) { * @exception IllegalArgumentException when either node1 or node2 is null, or * node1 or node2 do not belong to the cluster */ - public boolean isOnSameRack( Node node1, Node node2) { + public boolean isOnSameRack(Node node1, Node node2) { if (node1 == null || node2 == null) { return false; } - - netlock.readLock().lock(); - try { - return isSameParents(node1, node2); - } finally { - netlock.readLock().unlock(); - } + + return isSameParents(node1, node2); } /** @@ -440,11 +438,14 @@ protected boolean isSameParents(Node node1, Node node2) { return node1.getParent()==node2.getParent(); } - private static final Random r = new Random(); - @VisibleForTesting void setRandomSeed(long seed) { - r.setSeed(seed); + RANDOM_REF.set(new Random(seed)); + } + + Random getRandom() { + Random random = RANDOM_REF.get(); + return (random == null) ? ThreadLocalRandom.current() : random; } /** @@ -563,6 +564,7 @@ private Node chooseRandom(final InnerNode parentNode, totalInScopeNodes, availableNodes); return null; } + Random r = getRandom(); if (excludedNodes == null || excludedNodes.isEmpty()) { // if there are no excludedNodes, randomly choose a node final int index = r.nextInt(totalInScopeNodes); @@ -879,7 +881,7 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) { * This method is called if the reader is a datanode, * so nonDataNodeReader flag is set to false. */ - sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list)); + sortByDistance(reader, nodes, activeLen, null); } /** @@ -922,8 +924,7 @@ public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes, * This method is called if the reader is not a datanode, * so nonDataNodeReader flag is set to true. */ - sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, - list -> Collections.shuffle(list)); + sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, null); } /** @@ -961,38 +962,28 @@ private void sortByDistance(Node reader, T[] nodes, int activeLen, Consumer> secondarySort, boolean nonDataNodeReader) { /** Sort weights for the nodes array */ - int[] weights = new int[activeLen]; - for (int i=0; i> weightedNodeTree = + new TreeMap<>(); + int nWeight; + for (int i = 0; i < activeLen; i++) { + if (nonDataNodeReader) { + nWeight = getWeightUsingNetworkLocation(reader, nodes[i]); } else { - weights[i] = getWeight(reader, nodes[i]); + nWeight = getWeight(reader, nodes[i]); } + weightedNodeTree.computeIfAbsent( + nWeight, k -> new ArrayList<>(1)).add(nodes[i]); } - // Add weight/node pairs to a TreeMap to sort - TreeMap> tree = new TreeMap<>(); - for (int i=0; i list = tree.get(weight); - if (list == null) { - list = Lists.newArrayListWithExpectedSize(1); - tree.put(weight, list); - } - list.add(node); - } - // Sort nodes which have the same weight using secondarySort. int idx = 0; - for (List list: tree.values()) { - if (list != null) { - Collections.shuffle(list, r); - if (secondarySort != null) { - secondarySort.accept(list); - } - for (T n: list) { - nodes[idx] = n; - idx++; - } + // Sort nodes which have the same weight using secondarySort. + for (List nodesList : weightedNodeTree.values()) { + Collections.shuffle(nodesList, getRandom()); + if (secondarySort != null) { + // a secondary sort breaks the tie between nodes. + secondarySort.accept(nodesList); + } + for (T n : nodesList) { + nodes[idx++] = n; } } Preconditions.checkState(idx == activeLen, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 74c3f046ff..5758fe7986 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -56,7 +57,7 @@ public class TestNetworkTopology { private DatanodeDescriptor dataNodes[]; @Rule - public Timeout testTimeout = new Timeout(30000); + public Timeout testTimeout = new Timeout(30000, TimeUnit.MILLISECONDS); @Before public void setupDatanodes() {