HDFS-15285. The same distance and load nodes don't shuffle when consider DataNode load. Contributed by Lisheng Sun.
This commit is contained in:
parent
410c605aec
commit
9ca6298a9a
@ -983,7 +983,10 @@ private <T extends Node> void sortByDistance(Node reader, T[] nodes,
|
|||||||
int idx = 0;
|
int idx = 0;
|
||||||
for (List<T> list: tree.values()) {
|
for (List<T> list: tree.values()) {
|
||||||
if (list != null) {
|
if (list != null) {
|
||||||
|
Collections.shuffle(list, r);
|
||||||
|
if (secondarySort != null) {
|
||||||
secondarySort.accept(list);
|
secondarySort.accept(list);
|
||||||
|
}
|
||||||
for (T n: list) {
|
for (T n: list) {
|
||||||
nodes[idx] = n;
|
nodes[idx] = n;
|
||||||
idx++;
|
idx++;
|
||||||
|
@ -548,8 +548,7 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
|
private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
|
||||||
Consumer<List<DatanodeInfo>> secondarySort =
|
Consumer<List<DatanodeInfo>> secondarySort = null;
|
||||||
list -> Collections.shuffle(list);
|
|
||||||
if (readConsiderLoad) {
|
if (readConsiderLoad) {
|
||||||
Comparator<DatanodeInfo> comp =
|
Comparator<DatanodeInfo> comp =
|
||||||
Comparator.comparingInt(DatanodeInfo::getXceiverCount);
|
Comparator.comparingInt(DatanodeInfo::getXceiverCount);
|
||||||
|
@ -28,11 +28,13 @@
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -600,6 +602,72 @@ public void testGetBlockLocationConsiderLoad()
|
|||||||
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
|
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBlockLocationConsiderLoadWithNodesOfSameDistance()
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
|
||||||
|
conf.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||||
|
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||||
|
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||||
|
URL shellScript = getClass()
|
||||||
|
.getResource("/" + Shell.appendScriptExtension("topology-script"));
|
||||||
|
Path resourcePath = Paths.get(shellScript.toURI());
|
||||||
|
FileUtil.setExecutable(resourcePath.toFile(), true);
|
||||||
|
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
|
||||||
|
resourcePath.toString());
|
||||||
|
DatanodeManager dm = mockDatanodeManager(fsn, conf);
|
||||||
|
|
||||||
|
int totalDNs = 5;
|
||||||
|
// Register 5 datanodes and 2 nodes per rack with different load.
|
||||||
|
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
|
||||||
|
String[] storageIDs = new String[totalDNs];
|
||||||
|
for (int i = 0; i < totalDNs; i++) {
|
||||||
|
// Register new datanode.
|
||||||
|
String uuid = "UUID-" + i;
|
||||||
|
String ip = "IP-" + i / 2 + "-" + i;
|
||||||
|
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
|
||||||
|
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
|
||||||
|
Mockito.when(dr.getIpAddr()).thenReturn(ip);
|
||||||
|
dm.registerDatanode(dr);
|
||||||
|
|
||||||
|
// Get location and storage information.
|
||||||
|
locs[i] = dm.getDatanode(uuid);
|
||||||
|
storageIDs[i] = "storageID-" + i;
|
||||||
|
|
||||||
|
// Set load for datanodes.
|
||||||
|
locs[i].setXceiverCount(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set node 0 decommissioned.
|
||||||
|
locs[0].setDecommissioned();
|
||||||
|
|
||||||
|
// Create LocatedBlock with above locations.
|
||||||
|
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
|
||||||
|
LocatedBlock block = new LocatedBlock(b, locs);
|
||||||
|
List<LocatedBlock> blocks = new ArrayList<>();
|
||||||
|
blocks.add(block);
|
||||||
|
|
||||||
|
// Test client not in cluster but same rack with locs[3].
|
||||||
|
// Number of iterations to do the test
|
||||||
|
int numIterations = 100;
|
||||||
|
|
||||||
|
Set<String> ipSet = new HashSet<>();
|
||||||
|
final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
|
||||||
|
for (int i = 0; i < numIterations; i++) {
|
||||||
|
dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
|
||||||
|
DatanodeInfo[] sortedLocs = block.getLocations();
|
||||||
|
assertEquals(totalDNs, sortedLocs.length);
|
||||||
|
if (!ipSet.contains(sortedLocs[0].getIpAddr())) {
|
||||||
|
ipSet.add(sortedLocs[0].getIpAddr());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// when the two nodes' distance and weight are same, they are same close.
|
||||||
|
assertEquals(2, ipSet.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether removing a host from the includes list without adding it to
|
* Test whether removing a host from the includes list without adding it to
|
||||||
* the excludes list will exclude it from data node reports.
|
* the excludes list will exclude it from data node reports.
|
||||||
|
Loading…
Reference in New Issue
Block a user