HDFS-2147. Move cluster network topology to block management and fix some javac warnings.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1148112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-07-19 00:26:21 +00:00
parent 2a49e1a3a4
commit c3f6575ca4
12 changed files with 138 additions and 101 deletions

View File

@ -569,6 +569,9 @@ Trunk (unreleased changes)
HDFS-2157. Improve header comment in o.a.h.hdfs.server.namenode.NameNode.
(atm via eli)
HDFS-2147. Move cluster network topology to block management and fix some
javac warnings. (szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@ -31,6 +32,8 @@
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -47,6 +50,7 @@
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.Node;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@ -55,8 +59,9 @@
*/
@InterfaceAudience.Private
public class BlockManager {
// Default initial capacity and load factor of map
public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;
static final Log LOG = LogFactory.getLog(BlockManager.class);
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
private final FSNamesystem namesystem;
@ -104,7 +109,7 @@ public long getExcessBlocksCount() {
//
// Store blocks-->datanodedescriptor(s) map of corrupt replicas
//
CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
//
// Keeps a Collection for every named machine containing
@ -112,7 +117,7 @@ public long getExcessBlocksCount() {
// on the machine in question.
// Mapping: StorageID -> ArrayList<Block>
//
Map<String, Collection<Block>> recentInvalidateSets =
private final Map<String, Collection<Block>> recentInvalidateSets =
new TreeMap<String, Collection<Block>>();
//
@ -128,22 +133,22 @@ public long getExcessBlocksCount() {
// Store set of Blocks that need to be replicated 1 or more times.
// We also store pending replication-orders.
//
public UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
private PendingReplicationBlocks pendingReplications;
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
private final PendingReplicationBlocks pendingReplications;
// The maximum number of replicas allowed for a block
public int maxReplication;
public final int maxReplication;
// How many outgoing replication streams a given node should have at one time
public int maxReplicationStreams;
// Minimum copies needed or else write is disallowed
public int minReplication;
public final int minReplication;
// Default number of replicas
public int defaultReplication;
public final int defaultReplication;
// How many entries are returned by getCorruptInodes()
int maxCorruptFilesReturned;
final int maxCorruptFilesReturned;
// variable to enable check for enough racks
boolean shouldCheckForEnoughRacks = true;
final boolean shouldCheckForEnoughRacks;
/**
* Last block index used for replication work.
@ -152,28 +157,18 @@ public long getExcessBlocksCount() {
Random r = new Random();
// for block replicas placement
public BlockPlacementPolicy replicator;
public final BlockPlacementPolicy replicator;
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
}
BlockManager(FSNamesystem fsn, Configuration conf, int capacity)
throws IOException {
namesystem = fsn;
datanodeManager = new DatanodeManager(fsn);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
replicator = BlockPlacementPolicy.getInstance(
conf, namesystem, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
setConfigurationParameters(conf);
blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR);
datanodeManager = new DatanodeManager(fsn);
}
void setConfigurationParameters(Configuration conf) throws IOException {
this.replicator = BlockPlacementPolicy.getInstance(
conf,
namesystem,
namesystem.clusterMap);
this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
@ -541,6 +536,22 @@ public void verifyReplication(String src,
minReplication);
}
/** Remove a datanode. */
public void removeDatanode(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
node.resetBlocks();
removeFromInvalidates(node.getStorageID());
datanodeManager.getNetworkTopology().remove(node);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + node.getName());
}
}
void removeFromInvalidates(String storageID, Block block) {
Collection<Block> v = recentInvalidateSets.get(storageID);
if (v != null && v.remove(block)) {
@ -1001,6 +1012,29 @@ private boolean computeReplicationWorkForBlock(Block block, int priority) {
return true;
}
/**
* Choose target datanodes according to the replication policy.
* @throws IOException if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
*/
public DatanodeDescriptor[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
final HashMap<Node, Node> excludedNodes,
final long blocksize) throws IOException {
// choose targets for the new block to be allocated.
final DatanodeDescriptor targets[] = replicator.chooseTarget(
src, numOfReplicas, client, excludedNodes, blocksize);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
minReplication + ". There are "
+ getDatanodeManager().getNetworkTopology().getNumOfLeaves()
+ " datanode(s) running but "+excludedNodes.size() +
" node(s) are excluded in this operation.");
}
return targets;
}
/**
* Parse the data-nodes the block belongs to and choose one,
* which will be the replication source.

View File

@ -57,7 +57,7 @@ public void remove() {
private GSet<Block, BlockInfo> blocks;
BlocksMap(int initialCapacity, float loadFactor) {
BlocksMap(final float loadFactor) {
this.capacity = computeCapacity();
this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -25,8 +27,11 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Daemon;
/**
@ -39,6 +44,10 @@ public class DatanodeManager {
final FSNamesystem namesystem;
/** Cluster network topology */
private final NetworkTopology networktopology = new NetworkTopology();
/** Host names to datanode descriptors mapping. */
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
DatanodeManager(final FSNamesystem namesystem) {
@ -60,6 +69,24 @@ void close() {
if (decommissionthread != null) decommissionthread.interrupt();
}
/** @return the network topology. */
public NetworkTopology getNetworkTopology() {
return networktopology;
}
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) {
//sort the blocks
final DatanodeDescriptor client = getDatanodeByHost(targethost);
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned datanodes to the bottom
Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
}
}
/** @return the datanode descriptor for the host. */
public DatanodeDescriptor getDatanodeByHost(final String host) {
return host2DatanodeMap.getDatanodeByHost(host);
@ -74,10 +101,12 @@ public void addDatanode(final DatanodeDescriptor node) {
host2DatanodeMap.remove(
namesystem.datanodeMap.put(node.getStorageID(), node));
}
host2DatanodeMap.add(node);
networktopology.add(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".unprotectedAddDatanode: "
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node.getName() + " is added to datanodeMap.");
}
}

View File

@ -138,7 +138,6 @@
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -240,7 +239,7 @@ private static final void logAuditEvent(UserGroupInformation ugi,
// Stores the correct file name hierarchy
//
public FSDirectory dir;
public BlockManager blockManager;
BlockManager blockManager;
// Block pool ID used by this namenode
String blockPoolId;
@ -318,8 +317,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
private volatile SafeModeInfo safeMode; // safe mode information
/** datanode network toplogy */
public NetworkTopology clusterMap = new NetworkTopology();
private DNSToSwitchMapping dnsToSwitchMapping;
private HostsFileReader hostsReader;
@ -876,15 +873,8 @@ LocatedBlocks getBlockLocations(String clientMachine, String src,
FileNotFoundException, UnresolvedLinkException, IOException {
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
if (blocks != null) {
//sort the blocks
final DatanodeDescriptor client =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
for (LocatedBlock b : blocks.getLocatedBlocks()) {
clusterMap.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned datanodes to the bottom
Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
}
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
}
return blocks;
}
@ -1774,16 +1764,8 @@ public LocatedBlock getAdditionalBlock(String src,
}
// choose targets for the new block to be allocated.
DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
final DatanodeDescriptor targets[] = blockManager.chooseTarget(
src, replication, clientNode, excludedNodes, blockSize);
if (targets.length < blockManager.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
blockManager.minReplication + ". There are "
+clusterMap.getNumOfLeaves()+" datanode(s) running"
+" but "+excludedNodes.size() +
" node(s) are excluded in this operation.");
}
// Allocate a new block and record it in the INode.
writeLock();
@ -2881,14 +2863,14 @@ nodes with its data cleared (or user can just remove the StorageID
nodeReg.getStorageID());
}
// update cluster map
clusterMap.remove(nodeS);
blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setHostName(hostName);
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
clusterMap.add(nodeS);
blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
synchronized(heartbeats) {
@ -2919,7 +2901,6 @@ nodes with its data cleared (or user can just remove the StorageID
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
resolveNetworkLocation(nodeDescr);
blockManager.getDatanodeManager().addDatanode(nodeDescr);
clusterMap.add(nodeDescr);
checkDecommissioning(nodeDescr, dnAddress);
// also treat the registration message as a heartbeat
@ -3336,27 +3317,11 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) {
}
}
Iterator<? extends Block> it = nodeInfo.getBlockIterator();
while(it.hasNext()) {
blockManager.removeStoredBlock(it.next(), nodeInfo);
}
unprotectedRemoveDatanode(nodeInfo);
clusterMap.remove(nodeInfo);
blockManager.removeDatanode(nodeInfo);
checkSafeMode();
}
void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
assert hasWriteLock();
nodeDescr.resetBlocks();
blockManager.removeFromInvalidates(nodeDescr.getStorageID());
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.unprotectedRemoveDatanode: "
+ nodeDescr.getName() + " is out of service now.");
}
}
FSImage getFSImage() {
return dir.fsImage;
}
@ -4104,14 +4069,6 @@ public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
return node;
}
/** Choose a random datanode
*
* @return a randomly chosen datanode
*/
DatanodeDescriptor getRandomDatanode() {
return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT);
}
/**
* SafeModeInfo contains information related to the safe mode.
* <p>
@ -4278,9 +4235,10 @@ synchronized void leave(boolean checkForUpgrades) {
}
reached = -1;
safeMode = null;
final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
NameNode.stateChangeLog.info("STATE* Network topology has "
+clusterMap.getNumOfRacks()+" racks and "
+clusterMap.getNumOfLeaves()+ " datanodes");
+ nt.getNumOfRacks() + " racks and "
+ nt.getNumOfLeaves() + " datanodes");
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
+blockManager.neededReplications.size()+" blocks");
}
@ -5866,6 +5824,11 @@ public String getBlockPoolId() {
return blockPoolId;
}
/** @return the block manager. */
public BlockManager getBlockManager() {
return blockManager;
}
/**
* Remove an already decommissioned data node who is neither in include nor
* exclude hosts lists from the the list of live or dead nodes. This is used

View File

@ -32,14 +32,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.znerd.xmlenc.XMLOutputter;
@ -61,7 +61,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
(Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
final UserGroupInformation ugi = getUGI(request, conf);
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final DatanodeID datanode = namenode.getNamesystem().getRandomDatanode();
final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode);
try {
final URI uri = createRedirectUri("/getFileChecksum", ugi, datanode,
request, namenode);

View File

@ -86,7 +86,7 @@ private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i)
if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
// pick a random datanode
NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
return nn.getNamesystem().getRandomDatanode();
return NamenodeJspHelper.getRandomDatanode(nn);
}
return JspHelper.bestNode(blks);
}

View File

@ -66,7 +66,8 @@ public Object run() throws Exception {
namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE);
final short minReplication = namesystem.getMinReplication();
new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
new NamenodeFsck(conf, nn,
NamenodeJspHelper.getNetworkTopology(nn), pmap, out,
totalDatanodes, minReplication, remoteAddress).fsck();
return null;

View File

@ -89,7 +89,6 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
@ -1424,10 +1423,6 @@ public InetSocketAddress getHttpAddress() {
return httpAddress;
}
NetworkTopology getNetworkTopology() {
return this.namesystem.clusterMap;
}
/**
* Verify that configured directories exist, then
* Interactively confirm that formatting is desired

View File

@ -48,6 +48,8 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ServletUtil;
@ -368,13 +370,25 @@ public Token<DelegationTokenIdentifier> run() throws IOException {
return token == null ? null : token.encodeToUrlString();
}
/** @return the network topology. */
static NetworkTopology getNetworkTopology(final NameNode namenode) {
return namenode.getNamesystem().getBlockManager().getDatanodeManager(
).getNetworkTopology();
}
/** @return a randomly chosen datanode. */
static DatanodeDescriptor getRandomDatanode(final NameNode namenode) {
return (DatanodeDescriptor)getNetworkTopology(namenode).chooseRandom(
NodeBase.ROOT);
}
static void redirectToRandomDataNode(ServletContext context,
HttpServletRequest request, HttpServletResponse resp) throws IOException,
InterruptedException {
final NameNode nn = (NameNode) context.getAttribute("name.node");
final Configuration conf = (Configuration) context
.getAttribute(JspHelper.CURRENT_CONF);
final DatanodeID datanode = nn.getNamesystem().getRandomDatanode();
final DatanodeID datanode = getRandomDatanode(nn);
UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
String tokenString = getDelegationToken(nn, request, conf, ugi);
// if the user is defined, get a delegation token and stringify it

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@ -68,9 +67,9 @@ public class TestReplicationPolicy extends TestCase {
e.printStackTrace();
throw (RuntimeException)new RuntimeException().initCause(e);
}
FSNamesystem fsNamesystem = namenode.getNamesystem();
replicator = fsNamesystem.blockManager.replicator;
cluster = fsNamesystem.clusterMap;
final BlockManager bm = namenode.getNamesystem().getBlockManager();
replicator = bm.replicator;
cluster = bm.getDatanodeManager().getNetworkTopology();
// construct network topology
for(int i=0; i<NUM_OF_DATANODES; i++) {
cluster.add(dataNodes[i]);

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
public class TestUnderReplicatedBlocks extends TestCase {
public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
@ -44,11 +43,11 @@ public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
// remove one replica from the blocksMap so block becomes under-replicated
// but the block does not get put into the under-replicated blocks queue
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
DatanodeDescriptor dn = namesystem.blockManager.blocksMap.nodeIterator(b.getLocalBlock()).next();
namesystem.blockManager.addToInvalidates(b.getLocalBlock(), dn);
namesystem.blockManager.blocksMap.removeNode(b.getLocalBlock(), dn);
DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next();
bm.addToInvalidates(b.getLocalBlock(), dn);
bm.blocksMap.removeNode(b.getLocalBlock(), dn);
// increment this file's replication factor
FsShell shell = new FsShell(conf);