HADOOP-17408. Optimize NetworkTopology sorting block locations. (#2601). Contributed by Ahmed Hussein and Daryn Sharp.

This commit is contained in:
Ahmed Hussein 2021-01-08 13:10:09 -06:00 committed by GitHub
parent e306f59421
commit 77435a025e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 44 deletions

View File

@ -19,7 +19,6 @@
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -29,6 +28,8 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -52,6 +53,8 @@ public class NetworkTopology {
private static final char PATH_SEPARATOR = '/'; private static final char PATH_SEPARATOR = '/';
private static final String PATH_SEPARATOR_STR = "/"; private static final String PATH_SEPARATOR_STR = "/";
private static final String ROOT = "/"; private static final String ROOT = "/";
private static final AtomicReference<Random> RANDOM_REF =
new AtomicReference<>();
public static class InvalidTopologyException extends RuntimeException { public static class InvalidTopologyException extends RuntimeException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ -399,12 +402,7 @@ public boolean isOnSameRack( Node node1, Node node2) {
return false; return false;
} }
netlock.readLock().lock();
try {
return isSameParents(node1, node2); return isSameParents(node1, node2);
} finally {
netlock.readLock().unlock();
}
} }
/** /**
@ -438,11 +436,14 @@ protected boolean isSameParents(Node node1, Node node2) {
return node1.getParent()==node2.getParent(); return node1.getParent()==node2.getParent();
} }
private static final Random r = new Random();
@VisibleForTesting @VisibleForTesting
void setRandomSeed(long seed) { void setRandomSeed(long seed) {
r.setSeed(seed); RANDOM_REF.set(new Random(seed));
}
Random getRandom() {
Random random = RANDOM_REF.get();
return (random == null) ? ThreadLocalRandom.current() : random;
} }
/** /**
@ -561,6 +562,7 @@ private Node chooseRandom(final InnerNode parentNode,
totalInScopeNodes, availableNodes); totalInScopeNodes, availableNodes);
return null; return null;
} }
Random r = getRandom();
if (excludedNodes == null || excludedNodes.isEmpty()) { if (excludedNodes == null || excludedNodes.isEmpty()) {
// if there are no excludedNodes, randomly choose a node // if there are no excludedNodes, randomly choose a node
final int index = r.nextInt(totalInScopeNodes); final int index = r.nextInt(totalInScopeNodes);
@ -876,7 +878,7 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
* This method is called if the reader is a datanode, * This method is called if the reader is a datanode,
* so nonDataNodeReader flag is set to false. * so nonDataNodeReader flag is set to false.
*/ */
sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list)); sortByDistance(reader, nodes, activeLen, null);
} }
/** /**
@ -919,8 +921,7 @@ public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes,
* This method is called if the reader is not a datanode, * This method is called if the reader is not a datanode,
* so nonDataNodeReader flag is set to true. * so nonDataNodeReader flag is set to true.
*/ */
sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, null);
list -> Collections.shuffle(list));
} }
/** /**
@ -958,38 +959,28 @@ private <T extends Node> void sortByDistance(Node reader, T[] nodes,
int activeLen, Consumer<List<T>> secondarySort, int activeLen, Consumer<List<T>> secondarySort,
boolean nonDataNodeReader) { boolean nonDataNodeReader) {
/** Sort weights for the nodes array */ /** Sort weights for the nodes array */
int[] weights = new int[activeLen]; TreeMap<Integer, List<T>> weightedNodeTree =
new TreeMap<>();
int nWeight;
for (int i = 0; i < activeLen; i++) { for (int i = 0; i < activeLen; i++) {
if (nonDataNodeReader) { if (nonDataNodeReader) {
weights[i] = getWeightUsingNetworkLocation(reader, nodes[i]); nWeight = getWeightUsingNetworkLocation(reader, nodes[i]);
} else { } else {
weights[i] = getWeight(reader, nodes[i]); nWeight = getWeight(reader, nodes[i]);
} }
weightedNodeTree.computeIfAbsent(
nWeight, k -> new ArrayList<>(1)).add(nodes[i]);
} }
// Add weight/node pairs to a TreeMap to sort
TreeMap<Integer, List<T>> tree = new TreeMap<>();
for (int i=0; i<activeLen; i++) {
int weight = weights[i];
T node = nodes[i];
List<T> list = tree.get(weight);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(weight, list);
}
list.add(node);
}
// Sort nodes which have the same weight using secondarySort.
int idx = 0; int idx = 0;
for (List<T> list: tree.values()) { // Sort nodes which have the same weight using secondarySort.
if (list != null) { for (List<T> nodesList : weightedNodeTree.values()) {
Collections.shuffle(list, r); Collections.shuffle(nodesList, getRandom());
if (secondarySort != null) { if (secondarySort != null) {
secondarySort.accept(list); // a secondary sort breaks the tie between nodes.
} secondarySort.accept(nodesList);
for (T n: list) {
nodes[idx] = n;
idx++;
} }
for (T n : nodesList) {
nodes[idx++] = n;
} }
} }
Preconditions.checkState(idx == activeLen, Preconditions.checkState(idx == activeLen,

View File

@ -29,6 +29,7 @@
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
@ -56,7 +57,7 @@ public class TestNetworkTopology {
private DatanodeDescriptor dataNodes[]; private DatanodeDescriptor dataNodes[];
@Rule @Rule
public Timeout testTimeout = new Timeout(30000); public Timeout testTimeout = new Timeout(30000, TimeUnit.MILLISECONDS);
@Before @Before
public void setupDatanodes() { public void setupDatanodes() {