diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 37148039d2..4ac177755a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -12,6 +12,9 @@ Trunk (unreleased changes) HADOOP-8135. Add ByteBufferReadable interface to FSDataInputStream. (Henry Robinson via atm) + HADOOP-8469. Make NetworkTopology class pluggable. (Junping Du via + szetszwo) + IMPROVEMENTS HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 67f3bc594c..c367294399 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -63,7 +63,9 @@ public class CommonConfigurationKeysPublic { /** See core-default.xml */ public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY = "net.topology.node.switch.mapping.impl"; - + /** See core-default.xml */ + public static final String NET_TOPOLOGY_IMPL_KEY = + "net.topology.impl"; /** See core-default.xml */ public static final String NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY = "net.topology.table.file.name"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index da8fab2956..892ba07359 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -19,6 +19,7 @@ package org.apache.hadoop.net; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -55,8 +56,8 @@ public class NetworkTopology { /** InnerNode represents a switch/router of a data center or rack. * Different from a leaf node, it has non-null children. */ - private class InnerNode extends NodeBase { - private ArrayList children=new ArrayList(); + static class InnerNode extends NodeBase { + protected List children=new ArrayList(); private int numOfLeaves; /** Construct an InnerNode from a path-like string */ @@ -76,7 +77,7 @@ public class NetworkTopology { } /** @return its children */ - Collection getChildren() {return children;} + List getChildren() {return children;} /** @return the number of children this node has */ int getNumOfChildren() { @@ -182,7 +183,23 @@ public class NetworkTopology { } } } - + + /** + * Creates a parent node to be added to the list of children. + * Creates a node using the InnerNode four argument constructor specifying + * the name, location, parent, and level of this node. + * + *

To be overridden in subclasses for specific InnerNode implementations, + * as alternative to overriding the full {@link #add(Node)} method. + * + * @param parentName The name of the parent node + * @return A new inner node + * @see InnerNode#InnerNode(String, String, InnerNode, int) + */ + protected InnerNode createParentNode(String parentName) { + return new InnerNode(parentName, getPath(this), this, this.getLevel()+1); + } + /** Remove node n from the subtree of this node * @param n node to be deleted * @return true if the node is deleted; false otherwise @@ -263,7 +280,7 @@ public class NetworkTopology { * @param excludedNode an excluded node (can be null) * @return */ - private Node getLeaf(int leafIndex, Node excludedNode) { + Node getLeaf(int leafIndex, Node excludedNode) { int count=0; // check if the excluded node a leaf boolean isLeaf = @@ -308,7 +325,21 @@ public class NetworkTopology { return null; } } - + + /** + * Determine if children a leaves, default implementation calls {@link #isRack()} + *

To be overridden in subclasses for specific InnerNode implementations, + * as alternative to overriding the full {@link #getLeaf(int, Node)} method. + * + * @return true if children are leaves, false otherwise + */ + protected boolean areChildrenLeaves() { + return isRack(); + } + + /** + * Get number of leaves. + */ int getNumOfLeaves() { return numOfLeaves; } @@ -317,18 +348,18 @@ public class NetworkTopology { /** * the root cluster map */ - InnerNode clusterMap = new InnerNode(InnerNode.ROOT); + InnerNode clusterMap; /** Depth of all leaf nodes */ private int depthOfAllLeaves = -1; /** rack counter */ - private int numOfRacks = 0; + protected int numOfRacks = 0; /** the lock used to manage access */ - private ReadWriteLock netlock; - + protected ReadWriteLock netlock = new ReentrantReadWriteLock(); + public NetworkTopology() { - netlock = new ReentrantReadWriteLock(); + clusterMap = new InnerNode(InnerNode.ROOT); } - + /** Add a leaf node * Update node counter & rack counter if necessary * @param node node to be added; can be null @@ -344,7 +375,7 @@ public class NetworkTopology { } netlock.writeLock().lock(); try { - Node rack = getNode(node.getNetworkLocation()); + Node rack = getNodeForNetworkLocation(node); if (rack != null && !(rack instanceof InnerNode)) { throw new IllegalArgumentException("Unexpected data node " + node.toString() @@ -376,7 +407,26 @@ public class NetworkTopology { netlock.writeLock().unlock(); } } - + + /** + * Return a reference to the node given its string representation. + * Default implementation delegates to {@link #getNode(String)}. + * + *

To be overridden in subclasses for specific NetworkTopology + * implementations, as alternative to overriding the full {@link #add(Node)} + * method. + * + * @param node The string representation of this node's network location is + * used to retrieve a Node object. + * @return a reference to the node; null if the node is not in the tree + * + * @see #add(Node) + * @see #getNode(String) + */ + protected Node getNodeForNetworkLocation(Node node) { + return getNode(node.getNetworkLocation()); + } + /** Remove a node * Update node counter and rack counter if necessary * @param node node to be removed; can be null @@ -403,7 +453,7 @@ public class NetworkTopology { netlock.writeLock().unlock(); } } - + /** Check if the tree contains node node * * @param node a node @@ -443,7 +493,21 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + + /** Given a string representation of a rack for a specific network + * location + * + * To be overridden in subclasses for specific NetworkTopology + * implementations, as alternative to overriding the full + * {@link #getRack(String)} method. + * @param loc + * a path-like string representation of a network location + * @return a rack string + */ + public String getRack(String loc) { + return loc; + } + /** @return the total number of racks */ public int getNumOfRacks() { netlock.readLock().lock(); @@ -453,7 +517,7 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + /** @return the total number of leaf nodes */ public int getNumOfLeaves() { netlock.readLock().lock(); @@ -463,7 +527,7 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + /** Return the distance between two nodes * It is assumed that the distance from one node to its parent is 1 * The distance between two nodes is calculated by summing up their distances @@ -509,8 +573,8 @@ public class NetworkTopology { return Integer.MAX_VALUE; } return dis+2; - } - + } + /** Check if two nodes are on the same rack * @param node1 one node (can be null) * @param node2 another node (can be null) @@ -525,13 +589,44 @@ public class NetworkTopology { netlock.readLock().lock(); try { - return node1.getParent()==node2.getParent(); + return isSameParents(node1, node2); } finally { netlock.readLock().unlock(); } } - - final private static Random r = new Random(); + + /** + * Check if network topology is aware of NodeGroup + */ + public boolean isNodeGroupAware() { + return false; + } + + /** + * Return false directly as not aware of NodeGroup, to be override in sub-class + */ + public boolean isOnSameNodeGroup(Node node1, Node node2) { + return false; + } + + /** + * Compare the parents of each node for equality + * + *

To be overridden in subclasses for specific NetworkTopology + * implementations, as alternative to overriding the full + * {@link #isOnSameRack(Node, Node)} method. + * + * @param node1 the first node to compare + * @param node2 the second node to compare + * @return true if their parents are equal, false otherwise + * + * @see #isOnSameRack(Node, Node) + */ + protected boolean isSameParents(Node node1, Node node2) { + return node1.getParent()==node2.getParent(); + } + + final protected static Random r = new Random(); /** randomly choose one node from scope * if scope starts with ~, choose one from the all nodes except for the * ones in scope; otherwise, choose one from scope @@ -550,7 +645,7 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + private Node chooseRandom(String scope, String excludedScope){ if (excludedScope != null) { if (scope.startsWith(excludedScope)) { @@ -579,7 +674,25 @@ public class NetworkTopology { int leaveIndex = r.nextInt(numOfDatanodes); return innerNode.getLeaf(leaveIndex, node); } - + + /** return leaves in scope + * @param scope a path string + * @return leaves nodes under specific scope + */ + public List getLeaves(String scope) { + Node node = getNode(scope); + List leafNodes = new ArrayList(); + if (!(node instanceof InnerNode)) { + leafNodes.add(node); + } else { + InnerNode innerNode = (InnerNode) node; + for (int i=0;iscope but not in excludedNodes * if scope starts with ~, return the number of nodes that are not * in scope and excludedNodes; @@ -619,7 +732,7 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + /** convert a network tree to a string */ public String toString() { // print the number of racks @@ -640,13 +753,12 @@ public class NetworkTopology { return tree.toString(); } - /* swap two array items */ - static private void swap(Node[] nodes, int i, int j) { + /** swap two array items */ + static protected void swap(Node[] nodes, int i, int j) { Node tempNode; tempNode = nodes[j]; nodes[j] = nodes[i]; nodes[i] = tempNode; - } /** Sort nodes array by their distances to reader @@ -697,4 +809,5 @@ public class NetworkTopology { swap(nodes, 0, r.nextInt(nodes.length)); } } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index db7eaed643..0fd1e7d51d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -199,10 +199,10 @@ public class DatanodeInfo extends DatanodeID implements Node { this.xceiverCount = xceiverCount; } - /** rack name */ + /** network location */ public synchronized String getNetworkLocation() {return location;} - /** Sets the rack name */ + /** Sets the network location */ public synchronized void setNetworkLocation(String location) { this.location = NodeBase.normalize(location); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2aee0eb5ee..c58b857ebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -66,6 +67,8 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.HostsFileReader; @@ -108,7 +111,7 @@ public class DatanodeManager { = new TreeMap(); /** Cluster network topology */ - private final NetworkTopology networktopology = new NetworkTopology(); + private final NetworkTopology networktopology; /** Host names to datanode descriptors mapping. */ private final Host2NodesMap host2DatanodeMap = new Host2NodesMap(); @@ -134,6 +137,12 @@ public class DatanodeManager { ) throws IOException { this.namesystem = namesystem; this.blockManager = blockManager; + + Class networkTopologyClass = + conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, + NetworkTopology.class, NetworkTopology.class); + networktopology = (NetworkTopology) ReflectionUtils.newInstance( + networkTopologyClass, conf); this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); @@ -206,13 +215,22 @@ public class DatanodeManager { public void sortLocatedBlocks(final String targethost, final List locatedblocks) { //sort the blocks - final DatanodeDescriptor client = getDatanodeByHost(targethost); + // As it is possible for the separation of node manager and datanode, + // here we should get node but not datanode only . + Node client = getDatanodeByHost(targethost); + if (client == null) { + List hosts = new ArrayList (1); + hosts.add(targethost); + String rName = dnsToSwitchMapping.resolve(hosts).get(0); + if (rName != null) + client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost); + } for (LocatedBlock b : locatedblocks) { networktopology.pseudoSortByDistance(client, b.getLocations()); // Move decommissioned datanodes to the bottom Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR); - } + } } CyclicIteration getDatanodeCyclicIteration(