HADOOP-8469. Make NetworkTopology class pluggable. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1347867 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-06-08 02:29:40 +00:00
parent bce811c18b
commit 415ce38b82
5 changed files with 171 additions and 35 deletions

View File

@ -12,6 +12,9 @@ Trunk (unreleased changes)
HADOOP-8135. Add ByteBufferReadable interface to FSDataInputStream. (Henry HADOOP-8135. Add ByteBufferReadable interface to FSDataInputStream. (Henry
Robinson via atm) Robinson via atm)
HADOOP-8469. Make NetworkTopology class pluggable. (Junping Du via
szetszwo)
IMPROVEMENTS IMPROVEMENTS
HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution

View File

@ -63,7 +63,9 @@ public class CommonConfigurationKeysPublic {
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY = public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY =
"net.topology.node.switch.mapping.impl"; "net.topology.node.switch.mapping.impl";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String NET_TOPOLOGY_IMPL_KEY =
"net.topology.impl";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY = public static final String NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY =
"net.topology.table.file.name"; "net.topology.table.file.name";

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.net;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -55,8 +56,8 @@ public class NetworkTopology {
/** InnerNode represents a switch/router of a data center or rack. /** InnerNode represents a switch/router of a data center or rack.
* Different from a leaf node, it has non-null children. * Different from a leaf node, it has non-null children.
*/ */
private class InnerNode extends NodeBase { static class InnerNode extends NodeBase {
private ArrayList<Node> children=new ArrayList<Node>(); protected List<Node> children=new ArrayList<Node>();
private int numOfLeaves; private int numOfLeaves;
/** Construct an InnerNode from a path-like string */ /** Construct an InnerNode from a path-like string */
@ -76,7 +77,7 @@ public class NetworkTopology {
} }
/** @return its children */ /** @return its children */
Collection<Node> getChildren() {return children;} List<Node> getChildren() {return children;}
/** @return the number of children this node has */ /** @return the number of children this node has */
int getNumOfChildren() { int getNumOfChildren() {
@ -183,6 +184,22 @@ public class NetworkTopology {
} }
} }
/**
* Creates a parent node to be added to the list of children.
* Creates a node using the InnerNode four argument constructor specifying
* the name, location, parent, and level of this node.
*
* <p>To be overridden in subclasses for specific InnerNode implementations,
* as alternative to overriding the full {@link #add(Node)} method.
*
* @param parentName The name of the parent node
* @return A new inner node
* @see InnerNode#InnerNode(String, String, InnerNode, int)
*/
protected InnerNode createParentNode(String parentName) {
return new InnerNode(parentName, getPath(this), this, this.getLevel()+1);
}
/** Remove node <i>n</i> from the subtree of this node /** Remove node <i>n</i> from the subtree of this node
* @param n node to be deleted * @param n node to be deleted
* @return true if the node is deleted; false otherwise * @return true if the node is deleted; false otherwise
@ -263,7 +280,7 @@ public class NetworkTopology {
* @param excludedNode an excluded node (can be null) * @param excludedNode an excluded node (can be null)
* @return * @return
*/ */
private Node getLeaf(int leafIndex, Node excludedNode) { Node getLeaf(int leafIndex, Node excludedNode) {
int count=0; int count=0;
// check if the excluded node a leaf // check if the excluded node a leaf
boolean isLeaf = boolean isLeaf =
@ -309,6 +326,20 @@ public class NetworkTopology {
} }
} }
/**
* Determine if children a leaves, default implementation calls {@link #isRack()}
* <p>To be overridden in subclasses for specific InnerNode implementations,
* as alternative to overriding the full {@link #getLeaf(int, Node)} method.
*
* @return true if children are leaves, false otherwise
*/
protected boolean areChildrenLeaves() {
return isRack();
}
/**
* Get number of leaves.
*/
int getNumOfLeaves() { int getNumOfLeaves() {
return numOfLeaves; return numOfLeaves;
} }
@ -317,16 +348,16 @@ public class NetworkTopology {
/** /**
* the root cluster map * the root cluster map
*/ */
InnerNode clusterMap = new InnerNode(InnerNode.ROOT); InnerNode clusterMap;
/** Depth of all leaf nodes */ /** Depth of all leaf nodes */
private int depthOfAllLeaves = -1; private int depthOfAllLeaves = -1;
/** rack counter */ /** rack counter */
private int numOfRacks = 0; protected int numOfRacks = 0;
/** the lock used to manage access */ /** the lock used to manage access */
private ReadWriteLock netlock; protected ReadWriteLock netlock = new ReentrantReadWriteLock();
public NetworkTopology() { public NetworkTopology() {
netlock = new ReentrantReadWriteLock(); clusterMap = new InnerNode(InnerNode.ROOT);
} }
/** Add a leaf node /** Add a leaf node
@ -344,7 +375,7 @@ public class NetworkTopology {
} }
netlock.writeLock().lock(); netlock.writeLock().lock();
try { try {
Node rack = getNode(node.getNetworkLocation()); Node rack = getNodeForNetworkLocation(node);
if (rack != null && !(rack instanceof InnerNode)) { if (rack != null && !(rack instanceof InnerNode)) {
throw new IllegalArgumentException("Unexpected data node " throw new IllegalArgumentException("Unexpected data node "
+ node.toString() + node.toString()
@ -377,6 +408,25 @@ public class NetworkTopology {
} }
} }
/**
* Return a reference to the node given its string representation.
* Default implementation delegates to {@link #getNode(String)}.
*
* <p>To be overridden in subclasses for specific NetworkTopology
* implementations, as alternative to overriding the full {@link #add(Node)}
* method.
*
* @param node The string representation of this node's network location is
* used to retrieve a Node object.
* @return a reference to the node; null if the node is not in the tree
*
* @see #add(Node)
* @see #getNode(String)
*/
protected Node getNodeForNetworkLocation(Node node) {
return getNode(node.getNetworkLocation());
}
/** Remove a node /** Remove a node
* Update node counter and rack counter if necessary * Update node counter and rack counter if necessary
* @param node node to be removed; can be null * @param node node to be removed; can be null
@ -444,6 +494,20 @@ public class NetworkTopology {
} }
} }
/** Given a string representation of a rack for a specific network
* location
*
* To be overridden in subclasses for specific NetworkTopology
* implementations, as alternative to overriding the full
* {@link #getRack(String)} method.
* @param loc
* a path-like string representation of a network location
* @return a rack string
*/
public String getRack(String loc) {
return loc;
}
/** @return the total number of racks */ /** @return the total number of racks */
public int getNumOfRacks() { public int getNumOfRacks() {
netlock.readLock().lock(); netlock.readLock().lock();
@ -525,13 +589,44 @@ public class NetworkTopology {
netlock.readLock().lock(); netlock.readLock().lock();
try { try {
return node1.getParent()==node2.getParent(); return isSameParents(node1, node2);
} finally { } finally {
netlock.readLock().unlock(); netlock.readLock().unlock();
} }
} }
final private static Random r = new Random(); /**
* Check if network topology is aware of NodeGroup
*/
public boolean isNodeGroupAware() {
return false;
}
/**
* Return false directly as not aware of NodeGroup, to be override in sub-class
*/
public boolean isOnSameNodeGroup(Node node1, Node node2) {
return false;
}
/**
* Compare the parents of each node for equality
*
* <p>To be overridden in subclasses for specific NetworkTopology
* implementations, as alternative to overriding the full
* {@link #isOnSameRack(Node, Node)} method.
*
* @param node1 the first node to compare
* @param node2 the second node to compare
* @return true if their parents are equal, false otherwise
*
* @see #isOnSameRack(Node, Node)
*/
protected boolean isSameParents(Node node1, Node node2) {
return node1.getParent()==node2.getParent();
}
final protected static Random r = new Random();
/** randomly choose one node from <i>scope</i> /** randomly choose one node from <i>scope</i>
* if scope starts with ~, choose one from the all nodes except for the * if scope starts with ~, choose one from the all nodes except for the
* ones in <i>scope</i>; otherwise, choose one from <i>scope</i> * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
@ -580,6 +675,24 @@ public class NetworkTopology {
return innerNode.getLeaf(leaveIndex, node); return innerNode.getLeaf(leaveIndex, node);
} }
/** return leaves in <i>scope</i>
* @param scope a path string
* @return leaves nodes under specific scope
*/
public List<Node> getLeaves(String scope) {
Node node = getNode(scope);
List<Node> leafNodes = new ArrayList<Node>();
if (!(node instanceof InnerNode)) {
leafNodes.add(node);
} else {
InnerNode innerNode = (InnerNode) node;
for (int i=0;i<innerNode.getNumOfLeaves();i++) {
leafNodes.add(innerNode.getLeaf(i, null));
}
}
return leafNodes;
}
/** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i> /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
* if scope starts with ~, return the number of nodes that are not * if scope starts with ~, return the number of nodes that are not
* in <i>scope</i> and <i>excludedNodes</i>; * in <i>scope</i> and <i>excludedNodes</i>;
@ -640,13 +753,12 @@ public class NetworkTopology {
return tree.toString(); return tree.toString();
} }
/* swap two array items */ /** swap two array items */
static private void swap(Node[] nodes, int i, int j) { static protected void swap(Node[] nodes, int i, int j) {
Node tempNode; Node tempNode;
tempNode = nodes[j]; tempNode = nodes[j];
nodes[j] = nodes[i]; nodes[j] = nodes[i];
nodes[i] = tempNode; nodes[i] = tempNode;
} }
/** Sort nodes array by their distances to <i>reader</i> /** Sort nodes array by their distances to <i>reader</i>
@ -697,4 +809,5 @@ public class NetworkTopology {
swap(nodes, 0, r.nextInt(nodes.length)); swap(nodes, 0, r.nextInt(nodes.length));
} }
} }
} }

View File

@ -199,10 +199,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
this.xceiverCount = xceiverCount; this.xceiverCount = xceiverCount;
} }
/** rack name */ /** network location */
public synchronized String getNetworkLocation() {return location;} public synchronized String getNetworkLocation() {return location;}
/** Sets the rack name */ /** Sets the network location */
public synchronized void setNetworkLocation(String location) { public synchronized void setNetworkLocation(String location) {
this.location = NodeBase.normalize(location); this.location = NodeBase.normalize(location);
} }

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -66,6 +67,8 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
@ -108,7 +111,7 @@ public class DatanodeManager {
= new TreeMap<String, DatanodeDescriptor>(); = new TreeMap<String, DatanodeDescriptor>();
/** Cluster network topology */ /** Cluster network topology */
private final NetworkTopology networktopology = new NetworkTopology(); private final NetworkTopology networktopology;
/** Host names to datanode descriptors mapping. */ /** Host names to datanode descriptors mapping. */
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap(); private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
@ -135,6 +138,12 @@ public class DatanodeManager {
this.namesystem = namesystem; this.namesystem = namesystem;
this.blockManager = blockManager; this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
networktopology = (NetworkTopology) ReflectionUtils.newInstance(
networkTopologyClass, conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.hostsReader = new HostsFileReader( this.hostsReader = new HostsFileReader(
@ -206,7 +215,16 @@ public class DatanodeManager {
public void sortLocatedBlocks(final String targethost, public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) { final List<LocatedBlock> locatedblocks) {
//sort the blocks //sort the blocks
final DatanodeDescriptor client = getDatanodeByHost(targethost); // As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
Node client = getDatanodeByHost(targethost);
if (client == null) {
List<String> hosts = new ArrayList<String> (1);
hosts.add(targethost);
String rName = dnsToSwitchMapping.resolve(hosts).get(0);
if (rName != null)
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
for (LocatedBlock b : locatedblocks) { for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations()); networktopology.pseudoSortByDistance(client, b.getLocations());