HDFS-10206. Datanodes not sorted properly by distance when the reader isn't a datanode. (Nandakumar via mingma)
This commit is contained in:
parent
563480dccd
commit
c73e08a6da
@ -57,6 +57,10 @@ public class NetworkTopology {
|
|||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(NetworkTopology.class);
|
LoggerFactory.getLogger(NetworkTopology.class);
|
||||||
|
|
||||||
|
private static final char PATH_SEPARATOR = '/';
|
||||||
|
private static final String PATH_SEPARATOR_STR = "/";
|
||||||
|
private static final String ROOT = "/";
|
||||||
|
|
||||||
public static class InvalidTopologyException extends RuntimeException {
|
public static class InvalidTopologyException extends RuntimeException {
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
public InvalidTopologyException(String msg) {
|
public InvalidTopologyException(String msg) {
|
||||||
@ -916,7 +920,7 @@ public int countNumOfAvailableNodes(String scope,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** convert a network tree to a string */
|
/** convert a network tree to a string. */
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
// print the number of racks
|
// print the number of racks
|
||||||
@ -970,19 +974,108 @@ public static String getLastHalf(String networkLocation) {
|
|||||||
* @return weight
|
* @return weight
|
||||||
*/
|
*/
|
||||||
protected int getWeight(Node reader, Node node) {
|
protected int getWeight(Node reader, Node node) {
|
||||||
// 0 is local, 1 is same rack, 2 is off rack
|
// 0 is local, 2 is same rack, and each level on each node increases the
|
||||||
// Start off by initializing to off rack
|
//weight by 1
|
||||||
int weight = 2;
|
//Start off by initializing to Integer.MAX_VALUE
|
||||||
if (reader != null) {
|
int weight = Integer.MAX_VALUE;
|
||||||
|
if (reader != null && node != null) {
|
||||||
if(reader.equals(node)) {
|
if(reader.equals(node)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int maxReaderLevel = reader.getLevel();
|
||||||
|
int maxNodeLevel = node.getLevel();
|
||||||
|
int currentLevelToCompare = maxReaderLevel > maxNodeLevel ?
|
||||||
|
maxNodeLevel : maxReaderLevel;
|
||||||
|
Node r = reader;
|
||||||
|
Node n = node;
|
||||||
weight = 0;
|
weight = 0;
|
||||||
} else if (isOnSameRack(reader, node)) {
|
while(r != null && r.getLevel() > currentLevelToCompare) {
|
||||||
weight = 1;
|
r = r.getParent();
|
||||||
|
weight++;
|
||||||
|
}
|
||||||
|
while(n != null && n.getLevel() > currentLevelToCompare) {
|
||||||
|
n = n.getParent();
|
||||||
|
weight++;
|
||||||
|
}
|
||||||
|
while(r != null && n != null && !r.equals(n)) {
|
||||||
|
r = r.getParent();
|
||||||
|
n = n.getParent();
|
||||||
|
weight+=2;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return weight;
|
return weight;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an integer weight which specifies how far away <i>node</i> is
|
||||||
|
* from <i>reader</i>. A lower value signifies that a node is closer.
|
||||||
|
* It uses network location to calculate the weight
|
||||||
|
*
|
||||||
|
* @param reader Node where data will be read
|
||||||
|
* @param node Replica of data
|
||||||
|
* @return weight
|
||||||
|
*/
|
||||||
|
private static int getWeightUsingNetworkLocation(Node reader, Node node) {
|
||||||
|
//Start off by initializing to Integer.MAX_VALUE
|
||||||
|
int weight = Integer.MAX_VALUE;
|
||||||
|
if(reader != null && node != null) {
|
||||||
|
String readerPath = normalizeNetworkLocationPath(
|
||||||
|
reader.getNetworkLocation());
|
||||||
|
String nodePath = normalizeNetworkLocationPath(
|
||||||
|
node.getNetworkLocation());
|
||||||
|
|
||||||
|
//same rack
|
||||||
|
if(readerPath.equals(nodePath)) {
|
||||||
|
if(reader.getName().equals(node.getName())) {
|
||||||
|
weight = 0;
|
||||||
|
} else {
|
||||||
|
weight = 2;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
String[] readerPathToken = readerPath.split(PATH_SEPARATOR_STR);
|
||||||
|
String[] nodePathToken = nodePath.split(PATH_SEPARATOR_STR);
|
||||||
|
int maxLevelToCompare = readerPathToken.length > nodePathToken.length ?
|
||||||
|
nodePathToken.length : readerPathToken.length;
|
||||||
|
int currentLevel = 1;
|
||||||
|
//traverse through the path and calculate the distance
|
||||||
|
while(currentLevel < maxLevelToCompare) {
|
||||||
|
if(!readerPathToken[currentLevel]
|
||||||
|
.equals(nodePathToken[currentLevel])){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
currentLevel++;
|
||||||
|
}
|
||||||
|
weight = (readerPathToken.length - currentLevel) +
|
||||||
|
(nodePathToken.length - currentLevel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return weight;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Normalize a path by stripping off any trailing {@link #PATH_SEPARATOR}.
|
||||||
|
* @param path path to normalize.
|
||||||
|
* @return the normalised path
|
||||||
|
* If <i>path</i>is null or empty {@link #ROOT} is returned
|
||||||
|
* @throws IllegalArgumentException if the first character of a non empty path
|
||||||
|
* is not {@link #PATH_SEPARATOR}
|
||||||
|
*/
|
||||||
|
private static String normalizeNetworkLocationPath(String path) {
|
||||||
|
if (path == null || path.length() == 0) {
|
||||||
|
return ROOT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (path.charAt(0) != PATH_SEPARATOR) {
|
||||||
|
throw new IllegalArgumentException("Network Location"
|
||||||
|
+ "path doesn't start with " +PATH_SEPARATOR+ ": "+path);
|
||||||
|
}
|
||||||
|
|
||||||
|
int len = path.length();
|
||||||
|
if (path.charAt(len-1) == PATH_SEPARATOR) {
|
||||||
|
return path.substring(0, len-1);
|
||||||
|
}
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sort nodes array by network distance to <i>reader</i>.
|
* Sort nodes array by network distance to <i>reader</i>.
|
||||||
* <p/>
|
* <p/>
|
||||||
@ -999,11 +1092,56 @@ protected int getWeight(Node reader, Node node) {
|
|||||||
* @param activeLen Number of active nodes at the front of the array
|
* @param activeLen Number of active nodes at the front of the array
|
||||||
*/
|
*/
|
||||||
public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
|
public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
|
||||||
|
/*
|
||||||
|
* This method is called if the reader is a datanode,
|
||||||
|
* so nonDataNodeReader flag is set to false.
|
||||||
|
*/
|
||||||
|
sortByDistance(reader, nodes, activeLen, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sort nodes array by network distance to <i>reader</i>.
|
||||||
|
* <p/> using network location. This is used when the reader
|
||||||
|
* is not a datanode. Sorting the nodes based on network distance
|
||||||
|
* from the reader reduces network traffic and improves
|
||||||
|
* performance.
|
||||||
|
* <p/>
|
||||||
|
*
|
||||||
|
* @param reader Node where data will be read
|
||||||
|
* @param nodes Available replicas with the requested data
|
||||||
|
* @param activeLen Number of active nodes at the front of the array
|
||||||
|
*/
|
||||||
|
public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes,
|
||||||
|
int activeLen) {
|
||||||
|
/*
|
||||||
|
* This method is called if the reader is not a datanode,
|
||||||
|
* so nonDataNodeReader flag is set to true.
|
||||||
|
*/
|
||||||
|
sortByDistance(reader, nodes, activeLen, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sort nodes array by network distance to <i>reader</i>.
|
||||||
|
* <p/>
|
||||||
|
* As an additional twist, we also randomize the nodes at each network
|
||||||
|
* distance. This helps with load balancing when there is data skew.
|
||||||
|
*
|
||||||
|
* @param reader Node where data will be read
|
||||||
|
* @param nodes Available replicas with the requested data
|
||||||
|
* @param activeLen Number of active nodes at the front of the array
|
||||||
|
* @param nonDataNodeReader True if the reader is not a datanode
|
||||||
|
*/
|
||||||
|
private void sortByDistance(Node reader, Node[] nodes, int activeLen,
|
||||||
|
boolean nonDataNodeReader) {
|
||||||
/** Sort weights for the nodes array */
|
/** Sort weights for the nodes array */
|
||||||
int[] weights = new int[activeLen];
|
int[] weights = new int[activeLen];
|
||||||
for (int i=0; i<activeLen; i++) {
|
for (int i=0; i<activeLen; i++) {
|
||||||
|
if(nonDataNodeReader) {
|
||||||
|
weights[i] = getWeightUsingNetworkLocation(reader, nodes[i]);
|
||||||
|
} else {
|
||||||
weights[i] = getWeight(reader, nodes[i]);
|
weights[i] = getWeight(reader, nodes[i]);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Add weight/node pairs to a TreeMap to sort
|
// Add weight/node pairs to a TreeMap to sort
|
||||||
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
|
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
|
||||||
for (int i=0; i<activeLen; i++) {
|
for (int i=0; i<activeLen; i++) {
|
||||||
|
@ -443,8 +443,10 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
|||||||
Comparator<DatanodeInfo> comparator) {
|
Comparator<DatanodeInfo> comparator) {
|
||||||
// As it is possible for the separation of node manager and datanode,
|
// As it is possible for the separation of node manager and datanode,
|
||||||
// here we should get node but not datanode only .
|
// here we should get node but not datanode only .
|
||||||
|
boolean nonDatanodeReader = false;
|
||||||
Node client = getDatanodeByHost(targetHost);
|
Node client = getDatanodeByHost(targetHost);
|
||||||
if (client == null) {
|
if (client == null) {
|
||||||
|
nonDatanodeReader = true;
|
||||||
List<String> hosts = new ArrayList<>(1);
|
List<String> hosts = new ArrayList<>(1);
|
||||||
hosts.add(targetHost);
|
hosts.add(targetHost);
|
||||||
List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
|
List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
|
||||||
@ -470,8 +472,12 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
|||||||
--lastActiveIndex;
|
--lastActiveIndex;
|
||||||
}
|
}
|
||||||
int activeLen = lastActiveIndex + 1;
|
int activeLen = lastActiveIndex + 1;
|
||||||
|
if(nonDatanodeReader) {
|
||||||
|
networktopology.sortByDistanceUsingNetworkLocation(client,
|
||||||
|
lb.getLocations(), activeLen);
|
||||||
|
} else {
|
||||||
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
|
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
|
||||||
|
}
|
||||||
// must update cache since we modified locations array
|
// must update cache since we modified locations array
|
||||||
lb.updateCachedStorageInfo();
|
lb.updateCachedStorageInfo();
|
||||||
}
|
}
|
||||||
|
@ -220,11 +220,9 @@ public void testSortByDistance() throws Exception {
|
|||||||
testNodes[2] = dataNodes[3];
|
testNodes[2] = dataNodes[3];
|
||||||
cluster.setRandomSeed(0xDEAD);
|
cluster.setRandomSeed(0xDEAD);
|
||||||
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
|
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
|
||||||
// sortByDistance does not take the "data center" layer into consideration
|
|
||||||
// and it doesn't sort by getDistance, so 1, 5, 3 is also valid here
|
|
||||||
assertTrue(testNodes[0] == dataNodes[1]);
|
assertTrue(testNodes[0] == dataNodes[1]);
|
||||||
assertTrue(testNodes[1] == dataNodes[5]);
|
assertTrue(testNodes[1] == dataNodes[3]);
|
||||||
assertTrue(testNodes[2] == dataNodes[3]);
|
assertTrue(testNodes[2] == dataNodes[5]);
|
||||||
|
|
||||||
// Array of just rack-local nodes
|
// Array of just rack-local nodes
|
||||||
// Expect a random first node
|
// Expect a random first node
|
||||||
@ -264,6 +262,29 @@ public void testSortByDistance() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertTrue("Expected to find a different first location", foundRandom);
|
assertTrue("Expected to find a different first location", foundRandom);
|
||||||
|
|
||||||
|
//Reader is not a datanode, but is in one of the datanode's rack.
|
||||||
|
testNodes[0] = dataNodes[0];
|
||||||
|
testNodes[1] = dataNodes[5];
|
||||||
|
testNodes[2] = dataNodes[8];
|
||||||
|
Node rackClient = new NodeBase("/d3/r1/25.25.25");
|
||||||
|
cluster.setRandomSeed(0xDEADBEEF);
|
||||||
|
cluster.sortByDistance(rackClient, testNodes, testNodes.length);
|
||||||
|
assertTrue(testNodes[0] == dataNodes[8]);
|
||||||
|
assertTrue(testNodes[1] == dataNodes[5]);
|
||||||
|
assertTrue(testNodes[2] == dataNodes[0]);
|
||||||
|
|
||||||
|
//Reader is not a datanode , but is in one of the datanode's data center.
|
||||||
|
testNodes[0] = dataNodes[8];
|
||||||
|
testNodes[1] = dataNodes[5];
|
||||||
|
testNodes[2] = dataNodes[0];
|
||||||
|
Node dcClient = new NodeBase("/d1/r2/25.25.25");
|
||||||
|
cluster.setRandomSeed(0xDEADBEEF);
|
||||||
|
cluster.sortByDistance(dcClient, testNodes, testNodes.length);
|
||||||
|
assertTrue(testNodes[0] == dataNodes[0]);
|
||||||
|
assertTrue(testNodes[1] == dataNodes[5]);
|
||||||
|
assertTrue(testNodes[2] == dataNodes[8]);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user