HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> Reviewed-by: Inigo Goiri <inigoiri@apache.org> Reviewed-by: Istvan Fajth <pifta@cloudera.com>
This commit is contained in:
parent
9f0610fb83
commit
c892a879dd
@ -31,6 +31,7 @@
|
||||
import java.util.*;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/** The class represents a cluster of computer with a tree hierarchical
|
||||
* network topology.
|
||||
@ -874,11 +875,33 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
|
||||
* This method is called if the reader is a datanode,
|
||||
* so nonDataNodeReader flag is set to false.
|
||||
*/
|
||||
sortByDistance(reader, nodes, activeLen, false);
|
||||
sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort nodes array by network distance to <i>reader</i>.
|
||||
* Sort nodes array by network distance to <i>reader</i> with secondary sort.
|
||||
* <p>
|
||||
* In a three-level topology, a node can be either local, on the same rack,
|
||||
* or on a different rack from the reader. Sorting the nodes based on network
|
||||
* distance from the reader reduces network traffic and improves
|
||||
* performance.
|
||||
* <p>
|
||||
* As an additional twist, we also randomize the nodes at each network
|
||||
* distance. This helps with load balancing when there is data skew.
|
||||
*
|
||||
* @param reader Node where data will be read
|
||||
* @param nodes Available replicas with the requested data
|
||||
* @param activeLen Number of active nodes at the front of the array
|
||||
* @param secondarySort a secondary sorting strategy which can inject into
|
||||
* that point from outside to help sort the same distance.
|
||||
*/
|
||||
public <T extends Node> void sortByDistance(Node reader, T[] nodes,
|
||||
int activeLen, Consumer<List<T>> secondarySort){
|
||||
sortByDistance(reader, nodes, activeLen, secondarySort, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort nodes array by network distance to <i>reader</i> with secondary sort.
|
||||
* <p> using network location. This is used when the reader
|
||||
* is not a datanode. Sorting the nodes based on network distance
|
||||
* from the reader reduces network traffic and improves
|
||||
@ -895,7 +918,27 @@ public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes,
|
||||
* This method is called if the reader is not a datanode,
|
||||
* so nonDataNodeReader flag is set to true.
|
||||
*/
|
||||
sortByDistance(reader, nodes, activeLen, true);
|
||||
sortByDistanceUsingNetworkLocation(reader, nodes, activeLen,
|
||||
list -> Collections.shuffle(list));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort nodes array by network distance to <i>reader</i>.
|
||||
* <p> using network location. This is used when the reader
|
||||
* is not a datanode. Sorting the nodes based on network distance
|
||||
* from the reader reduces network traffic and improves
|
||||
* performance.
|
||||
* <p>
|
||||
*
|
||||
* @param reader Node where data will be read
|
||||
* @param nodes Available replicas with the requested data
|
||||
* @param activeLen Number of active nodes at the front of the array
|
||||
* @param secondarySort a secondary sorting strategy which can inject into
|
||||
* that point from outside to help sort the same distance.
|
||||
*/
|
||||
public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
|
||||
T[] nodes, int activeLen, Consumer<List<T>> secondarySort) {
|
||||
sortByDistance(reader, nodes, activeLen, secondarySort, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -909,7 +952,8 @@ public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes,
|
||||
* @param activeLen Number of active nodes at the front of the array
|
||||
* @param nonDataNodeReader True if the reader is not a datanode
|
||||
*/
|
||||
private void sortByDistance(Node reader, Node[] nodes, int activeLen,
|
||||
private <T extends Node> void sortByDistance(Node reader, T[] nodes,
|
||||
int activeLen, Consumer<List<T>> secondarySort,
|
||||
boolean nonDataNodeReader) {
|
||||
/** Sort weights for the nodes array */
|
||||
int[] weights = new int[activeLen];
|
||||
@ -921,23 +965,23 @@ private void sortByDistance(Node reader, Node[] nodes, int activeLen,
|
||||
}
|
||||
}
|
||||
// Add weight/node pairs to a TreeMap to sort
|
||||
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
|
||||
TreeMap<Integer, List<T>> tree = new TreeMap<>();
|
||||
for (int i=0; i<activeLen; i++) {
|
||||
int weight = weights[i];
|
||||
Node node = nodes[i];
|
||||
List<Node> list = tree.get(weight);
|
||||
T node = nodes[i];
|
||||
List<T> list = tree.get(weight);
|
||||
if (list == null) {
|
||||
list = Lists.newArrayListWithExpectedSize(1);
|
||||
tree.put(weight, list);
|
||||
}
|
||||
list.add(node);
|
||||
}
|
||||
|
||||
// Sort nodes which have the same weight using secondarySort.
|
||||
int idx = 0;
|
||||
for (List<Node> list: tree.values()) {
|
||||
for (List<T> list: tree.values()) {
|
||||
if (list != null) {
|
||||
Collections.shuffle(list, r);
|
||||
for (Node n: list) {
|
||||
secondarySort.accept(list);
|
||||
for (T n: list) {
|
||||
nodes[idx] = n;
|
||||
idx++;
|
||||
}
|
||||
@ -946,4 +990,4 @@ private void sortByDistance(Node reader, Node[] nodes, int activeLen,
|
||||
Preconditions.checkState(idx == activeLen,
|
||||
"Sorted the wrong number of nodes!");
|
||||
}
|
||||
}
|
||||
}
|
@ -230,6 +230,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
|
||||
public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
|
||||
true;
|
||||
public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
|
||||
"dfs.namenode.read.considerLoad";
|
||||
public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
|
||||
false;
|
||||
public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR =
|
||||
"dfs.namenode.redundancy.considerLoad.factor";
|
||||
public static final double
|
||||
|
@ -30,7 +30,6 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
@ -66,6 +65,7 @@
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Manage datanodes, include decommission and other activities.
|
||||
@ -134,6 +134,9 @@ public class DatanodeManager {
|
||||
/** Whether or not to avoid using stale DataNodes for reading */
|
||||
private final boolean avoidStaleDataNodesForRead;
|
||||
|
||||
/** Whether or not to consider lad for reading. */
|
||||
private final boolean readConsiderLoad;
|
||||
|
||||
/**
|
||||
* Whether or not to avoid using stale DataNodes for writing.
|
||||
* Note that, even if this is configured, the policy may be
|
||||
@ -314,6 +317,9 @@ public class DatanodeManager {
|
||||
this.avoidStaleDataNodesForRead = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
|
||||
this.readConsiderLoad = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
|
||||
this.avoidStaleDataNodesForWrite = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||
@ -530,9 +536,10 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
||||
int activeLen = lastActiveIndex + 1;
|
||||
if(nonDatanodeReader) {
|
||||
networktopology.sortByDistanceUsingNetworkLocation(client,
|
||||
lb.getLocations(), activeLen);
|
||||
lb.getLocations(), activeLen, createSecondaryNodeSorter());
|
||||
} else {
|
||||
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
|
||||
networktopology.sortByDistance(client, lb.getLocations(), activeLen,
|
||||
createSecondaryNodeSorter());
|
||||
}
|
||||
// move PROVIDED storage to the end to prefer local replicas.
|
||||
lb.moveProvidedToEnd(activeLen);
|
||||
@ -540,6 +547,17 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
||||
lb.updateCachedStorageInfo();
|
||||
}
|
||||
|
||||
private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
|
||||
Consumer<List<DatanodeInfo>> secondarySort =
|
||||
list -> Collections.shuffle(list);
|
||||
if (readConsiderLoad) {
|
||||
Comparator<DatanodeInfo> comp =
|
||||
Comparator.comparingInt(DatanodeInfo::getXceiverCount);
|
||||
secondarySort = list -> Collections.sort(list, comp);
|
||||
}
|
||||
return secondarySort;
|
||||
}
|
||||
|
||||
/** @return the datanode descriptor for the host. */
|
||||
public DatanodeDescriptor getDatanodeByHost(final String host) {
|
||||
return host2DatanodeMap.getDatanodeByHost(host);
|
||||
|
@ -307,7 +307,9 @@
|
||||
<property>
|
||||
<name>dfs.namenode.redundancy.considerLoad</name>
|
||||
<value>true</value>
|
||||
<description>Decide if chooseTarget considers the target's load or not
|
||||
<description>
|
||||
Decide if chooseTarget considers the target's load or not when write.
|
||||
Turn on by default.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
@ -319,6 +321,15 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.read.considerLoad</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Decide if sort block locations considers the target's load or not when read.
|
||||
Turn off by default.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.httpserver.filter.handlers</name>
|
||||
<value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>
|
||||
|
@ -518,6 +518,88 @@ public void testGetBlockLocations()
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs2[0].getIpAddr());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBlockLocationConsiderLoad()
|
||||
throws IOException, URISyntaxException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||
URL shellScript = getClass().getResource(
|
||||
"/" + Shell.appendScriptExtension("topology-script"));
|
||||
Path resourcePath = Paths.get(shellScript.toURI());
|
||||
FileUtil.setExecutable(resourcePath.toFile(), true);
|
||||
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
|
||||
resourcePath.toString());
|
||||
DatanodeManager dm = mockDatanodeManager(fsn, conf);
|
||||
|
||||
int totalDNs = 5;
|
||||
// Register 5 datanodes and 2 nodes per rack with different load.
|
||||
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
|
||||
String[] storageIDs = new String[totalDNs];
|
||||
for (int i = 0; i < totalDNs; i++) {
|
||||
// Register new datanode.
|
||||
String uuid = "UUID-" + i;
|
||||
String ip = "IP-" + i / 2 + "-" + i;
|
||||
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
|
||||
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
|
||||
Mockito.when(dr.getIpAddr()).thenReturn(ip);
|
||||
dm.registerDatanode(dr);
|
||||
|
||||
// Get location and storage information.
|
||||
locs[i] = dm.getDatanode(uuid);
|
||||
storageIDs[i] = "storageID-" + i;
|
||||
|
||||
// Set load for datanodes.
|
||||
locs[i].setXceiverCount(i);
|
||||
}
|
||||
|
||||
// Set node 0 decommissioned.
|
||||
locs[0].setDecommissioned();
|
||||
|
||||
// Create LocatedBlock with above locations.
|
||||
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
|
||||
LocatedBlock block = new LocatedBlock(b, locs);
|
||||
List<LocatedBlock> blocks = new ArrayList<>();
|
||||
blocks.add(block);
|
||||
|
||||
// Test client located at locs[3] in cluster.
|
||||
final String targetIpInCluster = locs[3].getIpAddr();
|
||||
dm.sortLocatedBlocks(targetIpInCluster, blocks);
|
||||
DatanodeInfo[] sortedLocs = block.getLocations();
|
||||
assertEquals(totalDNs, sortedLocs.length);
|
||||
// Ensure the local node is first.
|
||||
assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
|
||||
// Ensure the lightweight node is more close when distance is same.
|
||||
assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
|
||||
assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
|
||||
assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
|
||||
// Ensure the two decommissioned DNs were moved to the end.
|
||||
assertThat(sortedLocs[4].getAdminState(),
|
||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||
assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
|
||||
|
||||
// Test client not in cluster but same rack with locs[3].
|
||||
final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
|
||||
dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
|
||||
DatanodeInfo[] sortedLocs2 = block.getLocations();
|
||||
assertEquals(totalDNs, sortedLocs2.length);
|
||||
// Ensure the local rack is first and lightweight node is first
|
||||
// when distance is same.
|
||||
assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
|
||||
assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
|
||||
assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
|
||||
// Ensure the two decommissioned DNs were moved to the end.
|
||||
assertThat(sortedLocs[4].getAdminState(),
|
||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether removing a host from the includes list without adding it to
|
||||
* the excludes list will exclude it from data node reports.
|
||||
|
@ -241,10 +241,16 @@ public void testSortByDistance() throws Exception {
|
||||
cluster.setRandomSeed(0xDEADBEEF);
|
||||
cluster.sortByDistance(dataNodes[8], dtestNodes, dtestNodes.length - 2);
|
||||
assertTrue(dtestNodes[0] == dataNodes[8]);
|
||||
assertTrue(dtestNodes[1] == dataNodes[11]);
|
||||
assertTrue(dtestNodes[2] == dataNodes[12]);
|
||||
assertTrue(dtestNodes[3] == dataNodes[9]);
|
||||
assertTrue(dtestNodes[4] == dataNodes[10]);
|
||||
assertTrue(dtestNodes[1] != dtestNodes[2]);
|
||||
assertTrue(dtestNodes[1] == dataNodes[11]
|
||||
|| dtestNodes[1] == dataNodes[12]);
|
||||
assertTrue(dtestNodes[2] == dataNodes[11]
|
||||
|| dtestNodes[2] == dataNodes[12]);
|
||||
assertTrue(dtestNodes[3] != dtestNodes[4]);
|
||||
assertTrue(dtestNodes[3] == dataNodes[9]
|
||||
|| dtestNodes[3] == dataNodes[10]);
|
||||
assertTrue(dtestNodes[4] == dataNodes[9]
|
||||
|| dtestNodes[4] == dataNodes[10]);
|
||||
|
||||
// array contains local node
|
||||
testNodes[0] = dataNodes[1];
|
||||
@ -331,10 +337,14 @@ public void testSortByDistance() throws Exception {
|
||||
testNodes[2] = dataNodes[8];
|
||||
Node rackClient = new NodeBase("/d3/r1/25.25.25");
|
||||
cluster.setRandomSeed(0xDEADBEEF);
|
||||
cluster.sortByDistance(rackClient, testNodes, testNodes.length);
|
||||
cluster.sortByDistanceUsingNetworkLocation(rackClient, testNodes,
|
||||
testNodes.length);
|
||||
assertTrue(testNodes[0] == dataNodes[8]);
|
||||
assertTrue(testNodes[1] == dataNodes[5]);
|
||||
assertTrue(testNodes[2] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] != testNodes[2]);
|
||||
assertTrue(testNodes[1] == dataNodes[0]
|
||||
|| testNodes[1] == dataNodes[5]);
|
||||
assertTrue(testNodes[2] == dataNodes[0]
|
||||
|| testNodes[2] == dataNodes[5]);
|
||||
|
||||
//Reader is not a datanode , but is in one of the datanode's data center.
|
||||
testNodes[0] = dataNodes[8];
|
||||
@ -342,10 +352,14 @@ public void testSortByDistance() throws Exception {
|
||||
testNodes[2] = dataNodes[0];
|
||||
Node dcClient = new NodeBase("/d1/r2/25.25.25");
|
||||
cluster.setRandomSeed(0xDEADBEEF);
|
||||
cluster.sortByDistance(dcClient, testNodes, testNodes.length);
|
||||
cluster.sortByDistanceUsingNetworkLocation(dcClient, testNodes,
|
||||
testNodes.length);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[5]);
|
||||
assertTrue(testNodes[2] == dataNodes[8]);
|
||||
assertTrue(testNodes[1] != testNodes[2]);
|
||||
assertTrue(testNodes[1] == dataNodes[5]
|
||||
|| testNodes[1] == dataNodes[8]);
|
||||
assertTrue(testNodes[2] == dataNodes[5]
|
||||
|| testNodes[2] == dataNodes[8]);
|
||||
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user