From c3f6575ca44e8ad803d0b46991472465b595cdeb Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 19 Jul 2011 00:26:21 +0000 Subject: [PATCH] HDFS-2147. Move cluster network topology to block management and fix some javac warnings. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1148112 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 88 +++++++++++++------ .../server/blockmanagement/BlocksMap.java | 2 +- .../blockmanagement/DatanodeManager.java | 31 ++++++- .../hdfs/server/namenode/FSNamesystem.java | 67 ++++---------- .../server/namenode/FileChecksumServlets.java | 6 +- .../hdfs/server/namenode/FileDataServlet.java | 2 +- .../hdfs/server/namenode/FsckServlet.java | 3 +- .../hadoop/hdfs/server/namenode/NameNode.java | 5 -- .../server/namenode/NamenodeJspHelper.java | 16 +++- .../TestReplicationPolicy.java | 7 +- .../TestUnderReplicatedBlocks.java | 9 +- 12 files changed, 138 insertions(+), 101 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index ddf15eb966..0c3be6a77e 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -569,6 +569,9 @@ Trunk (unreleased changes) HDFS-2157. Improve header comment in o.a.h.hdfs.server.namenode.NameNode. (atm via eli) + HDFS-2147. Move cluster network topology to block management and fix some + javac warnings. (szetszwo) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 0f5e5afcc4..5ad198de13 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -31,6 +32,8 @@ import java.util.TreeMap; import java.util.TreeSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.net.Node; /** * Keeps information related to the blocks stored in the Hadoop cluster. @@ -55,8 +59,9 @@ */ @InterfaceAudience.Private public class BlockManager { - // Default initial capacity and load factor of map - public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16; + static final Log LOG = LogFactory.getLog(BlockManager.class); + + /** Default load factor of map */ public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; private final FSNamesystem namesystem; @@ -104,7 +109,7 @@ public long getExcessBlocksCount() { // // Store blocks-->datanodedescriptor(s) map of corrupt replicas // - CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); + private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); // // Keeps a Collection for every named machine containing @@ -112,7 +117,7 @@ public long getExcessBlocksCount() { // on the machine in question. // Mapping: StorageID -> ArrayList // - Map> recentInvalidateSets = + private final Map> recentInvalidateSets = new TreeMap>(); // @@ -128,22 +133,22 @@ public long getExcessBlocksCount() { // Store set of Blocks that need to be replicated 1 or more times. // We also store pending replication-orders. // - public UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks(); - private PendingReplicationBlocks pendingReplications; + public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks(); + private final PendingReplicationBlocks pendingReplications; // The maximum number of replicas allowed for a block - public int maxReplication; + public final int maxReplication; // How many outgoing replication streams a given node should have at one time public int maxReplicationStreams; // Minimum copies needed or else write is disallowed - public int minReplication; + public final int minReplication; // Default number of replicas - public int defaultReplication; + public final int defaultReplication; // How many entries are returned by getCorruptInodes() - int maxCorruptFilesReturned; + final int maxCorruptFilesReturned; // variable to enable check for enough racks - boolean shouldCheckForEnoughRacks = true; + final boolean shouldCheckForEnoughRacks; /** * Last block index used for replication work. @@ -152,28 +157,18 @@ public long getExcessBlocksCount() { Random r = new Random(); // for block replicas placement - public BlockPlacementPolicy replicator; + public final BlockPlacementPolicy replicator; public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException { - this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY); - } - - BlockManager(FSNamesystem fsn, Configuration conf, int capacity) - throws IOException { namesystem = fsn; + datanodeManager = new DatanodeManager(fsn); + + blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR); + replicator = BlockPlacementPolicy.getInstance( + conf, namesystem, datanodeManager.getNetworkTopology()); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); - setConfigurationParameters(conf); - blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR); - datanodeManager = new DatanodeManager(fsn); - } - - void setConfigurationParameters(Configuration conf) throws IOException { - this.replicator = BlockPlacementPolicy.getInstance( - conf, - namesystem, - namesystem.clusterMap); this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY, @@ -541,6 +536,22 @@ public void verifyReplication(String src, minReplication); } + /** Remove a datanode. */ + public void removeDatanode(final DatanodeDescriptor node) { + final Iterator it = node.getBlockIterator(); + while(it.hasNext()) { + removeStoredBlock(it.next(), node); + } + + node.resetBlocks(); + removeFromInvalidates(node.getStorageID()); + datanodeManager.getNetworkTopology().remove(node); + + if (LOG.isDebugEnabled()) { + LOG.debug("remove datanode " + node.getName()); + } + } + void removeFromInvalidates(String storageID, Block block) { Collection v = recentInvalidateSets.get(storageID); if (v != null && v.remove(block)) { @@ -1001,6 +1012,29 @@ private boolean computeReplicationWorkForBlock(Block block, int priority) { return true; } + /** + * Choose target datanodes according to the replication policy. + * @throws IOException if the number of targets < minimum replication. + * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long) + */ + public DatanodeDescriptor[] chooseTarget(final String src, + final int numOfReplicas, final DatanodeDescriptor client, + final HashMap excludedNodes, + final long blocksize) throws IOException { + // choose targets for the new block to be allocated. + final DatanodeDescriptor targets[] = replicator.chooseTarget( + src, numOfReplicas, client, excludedNodes, blocksize); + if (targets.length < minReplication) { + throw new IOException("File " + src + " could only be replicated to " + + targets.length + " nodes, instead of " + + minReplication + ". There are " + + getDatanodeManager().getNetworkTopology().getNumOfLeaves() + + " datanode(s) running but "+excludedNodes.size() + + " node(s) are excluded in this operation."); + } + return targets; + } + /** * Parse the data-nodes the block belongs to and choose one, * which will be the replication source. diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 7a2854c816..a41c167b0c 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -57,7 +57,7 @@ public void remove() { private GSet blocks; - BlocksMap(int initialCapacity, float loadFactor) { + BlocksMap(final float loadFactor) { this.capacity = computeCapacity(); this.blocks = new LightWeightGSet(capacity); } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index efd8a3240d..42d1d8a1e4 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -25,8 +27,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.util.Daemon; /** @@ -39,6 +44,10 @@ public class DatanodeManager { final FSNamesystem namesystem; + /** Cluster network topology */ + private final NetworkTopology networktopology = new NetworkTopology(); + + /** Host names to datanode descriptors mapping. */ private final Host2NodesMap host2DatanodeMap = new Host2NodesMap(); DatanodeManager(final FSNamesystem namesystem) { @@ -60,6 +69,24 @@ void close() { if (decommissionthread != null) decommissionthread.interrupt(); } + /** @return the network topology. */ + public NetworkTopology getNetworkTopology() { + return networktopology; + } + + /** Sort the located blocks by the distance to the target host. */ + public void sortLocatedBlocks(final String targethost, + final List locatedblocks) { + //sort the blocks + final DatanodeDescriptor client = getDatanodeByHost(targethost); + for (LocatedBlock b : locatedblocks) { + networktopology.pseudoSortByDistance(client, b.getLocations()); + + // Move decommissioned datanodes to the bottom + Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR); + } + } + /** @return the datanode descriptor for the host. */ public DatanodeDescriptor getDatanodeByHost(final String host) { return host2DatanodeMap.getDatanodeByHost(host); @@ -74,10 +101,12 @@ public void addDatanode(final DatanodeDescriptor node) { host2DatanodeMap.remove( namesystem.datanodeMap.put(node.getStorageID(), node)); } + host2DatanodeMap.add(node); + networktopology.add(node); if (LOG.isDebugEnabled()) { - LOG.debug(getClass().getSimpleName() + ".unprotectedAddDatanode: " + LOG.debug(getClass().getSimpleName() + ".addDatanode: " + "node " + node.getName() + " is added to datanodeMap."); } } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 564daf0c7d..8b70d0e06e 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -138,7 +138,6 @@ import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; -import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -240,7 +239,7 @@ private static final void logAuditEvent(UserGroupInformation ugi, // Stores the correct file name hierarchy // public FSDirectory dir; - public BlockManager blockManager; + BlockManager blockManager; // Block pool ID used by this namenode String blockPoolId; @@ -318,8 +317,6 @@ private static final void logAuditEvent(UserGroupInformation ugi, private volatile SafeModeInfo safeMode; // safe mode information - /** datanode network toplogy */ - public NetworkTopology clusterMap = new NetworkTopology(); private DNSToSwitchMapping dnsToSwitchMapping; private HostsFileReader hostsReader; @@ -876,15 +873,8 @@ LocatedBlocks getBlockLocations(String clientMachine, String src, FileNotFoundException, UnresolvedLinkException, IOException { LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true); if (blocks != null) { - //sort the blocks - final DatanodeDescriptor client = - blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); - for (LocatedBlock b : blocks.getLocatedBlocks()) { - clusterMap.pseudoSortByDistance(client, b.getLocations()); - - // Move decommissioned datanodes to the bottom - Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR); - } + blockManager.getDatanodeManager().sortLocatedBlocks( + clientMachine, blocks.getLocatedBlocks()); } return blocks; } @@ -1774,16 +1764,8 @@ public LocatedBlock getAdditionalBlock(String src, } // choose targets for the new block to be allocated. - DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget( + final DatanodeDescriptor targets[] = blockManager.chooseTarget( src, replication, clientNode, excludedNodes, blockSize); - if (targets.length < blockManager.minReplication) { - throw new IOException("File " + src + " could only be replicated to " + - targets.length + " nodes, instead of " + - blockManager.minReplication + ". There are " - +clusterMap.getNumOfLeaves()+" datanode(s) running" - +" but "+excludedNodes.size() + - " node(s) are excluded in this operation."); - } // Allocate a new block and record it in the INode. writeLock(); @@ -2881,14 +2863,14 @@ nodes with its data cleared (or user can just remove the StorageID nodeReg.getStorageID()); } // update cluster map - clusterMap.remove(nodeS); + blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS); nodeS.updateRegInfo(nodeReg); nodeS.setHostName(hostName); nodeS.setDisallowed(false); // Node is in the include list // resolve network location resolveNetworkLocation(nodeS); - clusterMap.add(nodeS); + blockManager.getDatanodeManager().getNetworkTopology().add(nodeS); // also treat the registration message as a heartbeat synchronized(heartbeats) { @@ -2919,7 +2901,6 @@ nodes with its data cleared (or user can just remove the StorageID = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName); resolveNetworkLocation(nodeDescr); blockManager.getDatanodeManager().addDatanode(nodeDescr); - clusterMap.add(nodeDescr); checkDecommissioning(nodeDescr, dnAddress); // also treat the registration message as a heartbeat @@ -3336,27 +3317,11 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) { } } - Iterator it = nodeInfo.getBlockIterator(); - while(it.hasNext()) { - blockManager.removeStoredBlock(it.next(), nodeInfo); - } - unprotectedRemoveDatanode(nodeInfo); - clusterMap.remove(nodeInfo); + blockManager.removeDatanode(nodeInfo); checkSafeMode(); } - void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) { - assert hasWriteLock(); - nodeDescr.resetBlocks(); - blockManager.removeFromInvalidates(nodeDescr.getStorageID()); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "BLOCK* NameSystem.unprotectedRemoveDatanode: " - + nodeDescr.getName() + " is out of service now."); - } - } - FSImage getFSImage() { return dir.fsImage; } @@ -4104,14 +4069,6 @@ public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException { return node; } - /** Choose a random datanode - * - * @return a randomly chosen datanode - */ - DatanodeDescriptor getRandomDatanode() { - return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT); - } - /** * SafeModeInfo contains information related to the safe mode. *

@@ -4278,9 +4235,10 @@ synchronized void leave(boolean checkForUpgrades) { } reached = -1; safeMode = null; + final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology(); NameNode.stateChangeLog.info("STATE* Network topology has " - +clusterMap.getNumOfRacks()+" racks and " - +clusterMap.getNumOfLeaves()+ " datanodes"); + + nt.getNumOfRacks() + " racks and " + + nt.getNumOfLeaves() + " datanodes"); NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has " +blockManager.neededReplications.size()+" blocks"); } @@ -5866,6 +5824,11 @@ public String getBlockPoolId() { return blockPoolId; } + /** @return the block manager. */ + public BlockManager getBlockManager() { + return blockManager; + } + /** * Remove an already decommissioned data node who is neither in include nor * exclude hosts lists from the the list of live or dead nodes. This is used diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java index 10fb1e498b..17d30103b3 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java @@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.znerd.xmlenc.XMLOutputter; @@ -61,7 +61,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response (Configuration) context.getAttribute(JspHelper.CURRENT_CONF); final UserGroupInformation ugi = getUGI(request, conf); final NameNode namenode = (NameNode)context.getAttribute("name.node"); - final DatanodeID datanode = namenode.getNamesystem().getRandomDatanode(); + final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode); try { final URI uri = createRedirectUri("/getFileChecksum", ugi, datanode, request, namenode); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java index 4af3609db7..0b9f6d3ea2 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java @@ -86,7 +86,7 @@ private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i) if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) { // pick a random datanode NameNode nn = (NameNode)getServletContext().getAttribute("name.node"); - return nn.getNamesystem().getRandomDatanode(); + return NamenodeJspHelper.getRandomDatanode(nn); } return JspHelper.bestNode(blks); } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java index 7f8d753f64..95b4e6594d 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java @@ -66,7 +66,8 @@ public Object run() throws Exception { namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE); final short minReplication = namesystem.getMinReplication(); - new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out, + new NamenodeFsck(conf, nn, + NamenodeJspHelper.getNetworkTopology(nn), pmap, out, totalDatanodes, minReplication, remoteAddress).fsck(); return null; diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index cd5447245e..0756e9c582 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -89,7 +89,6 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -1424,10 +1423,6 @@ public InetSocketAddress getHttpAddress() { return httpAddress; } - NetworkTopology getNetworkTopology() { - return this.namesystem.clusterMap; - } - /** * Verify that configured directories exist, then * Interactively confirm that formatting is desired diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java index 1181097a1f..7135806a8d 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ServletUtil; @@ -368,13 +370,25 @@ public Token run() throws IOException { return token == null ? null : token.encodeToUrlString(); } + /** @return the network topology. */ + static NetworkTopology getNetworkTopology(final NameNode namenode) { + return namenode.getNamesystem().getBlockManager().getDatanodeManager( + ).getNetworkTopology(); + } + + /** @return a randomly chosen datanode. */ + static DatanodeDescriptor getRandomDatanode(final NameNode namenode) { + return (DatanodeDescriptor)getNetworkTopology(namenode).chooseRandom( + NodeBase.ROOT); + } + static void redirectToRandomDataNode(ServletContext context, HttpServletRequest request, HttpServletResponse resp) throws IOException, InterruptedException { final NameNode nn = (NameNode) context.getAttribute("name.node"); final Configuration conf = (Configuration) context .getAttribute(JspHelper.CURRENT_CONF); - final DatanodeID datanode = nn.getNamesystem().getRandomDatanode(); + final DatanodeID datanode = getRandomDatanode(nn); UserGroupInformation ugi = JspHelper.getUGI(context, request, conf); String tokenString = getDelegationToken(nn, request, conf, ugi); // if the user is defined, get a delegation token and stringify it diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 9ba21ead0d..83d90ad10f 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.FSConstants; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -68,9 +67,9 @@ public class TestReplicationPolicy extends TestCase { e.printStackTrace(); throw (RuntimeException)new RuntimeException().initCause(e); } - FSNamesystem fsNamesystem = namenode.getNamesystem(); - replicator = fsNamesystem.blockManager.replicator; - cluster = fsNamesystem.clusterMap; + final BlockManager bm = namenode.getNamesystem().getBlockManager(); + replicator = bm.replicator; + cluster = bm.getDatanodeManager().getNetworkTopology(); // construct network topology for(int i=0; i