HDFS-2167. Move dnsToSwitchMapping and hostsReader from FSNamesystem to DatanodeManager.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1149455 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-07-22 04:20:21 +00:00
parent c187bdc0a2
commit 233a7aa34f
9 changed files with 617 additions and 551 deletions

View File

@ -583,6 +583,9 @@ Trunk (unreleased changes)
to DelegationTokenIdentifier. (szetszwo)
HDFS-1774. Small optimization to FSDataset. (Uma Maheswara Rao G via eli)
HDFS-2167. Move dnsToSwitchMapping and hostsReader from FSNamesystem to
DatanodeManager. (szetszwo)
OPTIMIZATIONS

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
@ -33,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -106,10 +109,8 @@ public long getExcessBlocksCount() {
private final DatanodeManager datanodeManager;
//
// Store blocks-->datanodedescriptor(s) map of corrupt replicas
//
private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
//
// Keeps a Collection for every named machine containing
@ -136,34 +137,34 @@ public long getExcessBlocksCount() {
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
private final PendingReplicationBlocks pendingReplications;
// The maximum number of replicas allowed for a block
/** The maximum number of replicas allowed for a block */
public final int maxReplication;
// How many outgoing replication streams a given node should have at one time
/** The maximum number of outgoing replication streams
* a given node should have at one time
*/
public int maxReplicationStreams;
// Minimum copies needed or else write is disallowed
/** Minimum copies needed or else write is disallowed */
public final int minReplication;
// Default number of replicas
/** Default number of replicas */
public final int defaultReplication;
// How many entries are returned by getCorruptInodes()
/** The maximum number of entries returned by getCorruptInodes() */
final int maxCorruptFilesReturned;
// variable to enable check for enough racks
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
/**
* Last block index used for replication work.
*/
/** Last block index used for replication work. */
private int replIndex = 0;
// for block replicas placement
public final BlockPlacementPolicy replicator;
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
namesystem = fsn;
datanodeManager = new DatanodeManager(fsn);
datanodeManager = new DatanodeManager(fsn, conf);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
replicator = BlockPlacementPolicy.getInstance(
blockplacement = BlockPlacementPolicy.getInstance(
conf, namesystem, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@ -220,6 +221,19 @@ public DatanodeManager getDatanodeManager() {
return datanodeManager;
}
/** @return the BlockPlacementPolicy */
public BlockPlacementPolicy getBlockPlacementPolicy() {
return blockplacement;
}
/** Set BlockPlacementPolicy */
public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
if (newpolicy == null) {
throw new HadoopIllegalArgumentException("newpolicy == null");
}
this.blockplacement = newpolicy;
}
public void metaSave(PrintWriter out) {
//
// Dump contents of neededReplication
@ -551,7 +565,7 @@ public void removeDatanode(final DatanodeDescriptor node) {
}
}
void removeFromInvalidates(String storageID, Block block) {
private void removeFromInvalidates(String storageID, Block block) {
Collection<Block> v = recentInvalidateSets.get(storageID);
if (v != null && v.remove(block)) {
pendingDeletionBlocksCount--;
@ -921,7 +935,7 @@ private boolean computeReplicationWorkForBlock(Block block, int priority) {
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the Inode itself.
DatanodeDescriptor targets[] =
replicator.chooseTarget(fileINode, additionalReplRequired,
blockplacement.chooseTarget(fileINode, additionalReplRequired,
srcNode, containingNodes, block.getNumBytes());
if(targets.length == 0)
return false;
@ -1021,7 +1035,7 @@ public DatanodeDescriptor[] chooseTarget(final String src,
final HashMap<Node, Node> excludedNodes,
final long blocksize) throws IOException {
// choose targets for the new block to be allocated.
final DatanodeDescriptor targets[] = replicator.chooseTarget(
final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
src, numOfReplicas, client, excludedNodes, blocksize);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
@ -1240,7 +1254,7 @@ void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report)
}
}
void reportDiff(DatanodeDescriptor dn,
private void reportDiff(DatanodeDescriptor dn,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
@ -1670,7 +1684,7 @@ public void processOverReplicatedBlock(Block block, short replication,
}
}
namesystem.chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, replicator);
addedNode, delNodeHint, blockplacement);
}
public void addToExcessReplicate(DatanodeInfo dn, Block block) {
@ -1694,7 +1708,7 @@ public void addToExcessReplicate(DatanodeInfo dn, Block block) {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
private void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+ block + " from " + node.getName());
@ -1881,7 +1895,8 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
* On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock()
*/
public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) {
private void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
final Block block = it.next();
@ -1900,7 +1915,7 @@ public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
*/
public boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
@ -2022,7 +2037,7 @@ private int getReplication(Block block) {
}
/** Remove a datanode from the invalidatesSet */
public void removeFromInvalidates(String storageID) {
private void removeFromInvalidates(String storageID) {
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
if (blocks != null) {
pendingDeletionBlocksCount -= blocks.size();
@ -2086,28 +2101,6 @@ private int invalidateWorkForOneNode(String nodeId) {
namesystem.writeUnlock();
}
}
//Returns the number of racks over which a given block is replicated
//decommissioning/decommissioned nodes are not counted. corrupt replicas
//are also ignored
public int getNumberOfRacks(Block b) {
HashSet<String> rackSet = new HashSet<String>(0);
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(b);
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
String rackName = cur.getNetworkLocation();
if (!rackSet.contains(rackName)) {
rackSet.add(rackName);
}
}
}
}
return rackSet.size();
}
boolean blockHasEnoughRacks(Block b) {
if (!this.shouldCheckForEnoughRacks) {
@ -2209,4 +2202,50 @@ public BlockIterator getCorruptReplicaBlockIterator() {
return neededReplications
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
/**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
if (node.isDecommissionInProgress()) {
if (!isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for node " + node.getName());
}
}
return node.isDecommissioned();
}
/** Start decommissioning the specified datanode. */
void startDecommission(DatanodeDescriptor node) throws IOException {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName() + " with " +
node.numBlocks() + " blocks.");
synchronized (namesystem.heartbeats) {
namesystem.updateStats(node, false);
node.startDecommission();
namesystem.updateStats(node, true);
}
node.decommissioningStatus.setStartTime(now());
// all the blocks that reside on this node have to be replicated.
checkDecommissionStateInternal(node);
}
}
/** Stop decommissioning the specified datanodes. */
void stopDecommission(DatanodeDescriptor node) throws IOException {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node.getName());
synchronized (namesystem.heartbeats) {
namesystem.updateStats(node, false);
node.stopDecommission();
namesystem.updateStats(node, true);
}
processOverReplicatedBlocksOnReCommission(node);
}
}
}

View File

@ -18,8 +18,14 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,11 +34,23 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Manage datanodes, include decommission and other activities.
@ -49,9 +67,29 @@ public class DatanodeManager {
/** Host names to datanode descriptors mapping. */
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
private final DNSToSwitchMapping dnsToSwitchMapping;
/** Read include/exclude files*/
private final HostsFileReader hostsReader;
DatanodeManager(final FSNamesystem namesystem) {
DatanodeManager(final FSNamesystem namesystem, final Configuration conf
) throws IOException {
this.namesystem = namesystem;
this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
// If the dns to switch mapping supports cache, resolve network
// locations of those hosts in the include list and store the mapping
// in the cache; so future calls to resolve will be fast.
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
}
private Daemon decommissionthread = null;
@ -93,7 +131,7 @@ public DatanodeDescriptor getDatanodeByHost(final String host) {
}
/** Add a datanode. */
public void addDatanode(final DatanodeDescriptor node) {
private void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
@ -112,7 +150,7 @@ public void addDatanode(final DatanodeDescriptor node) {
}
/** Physically remove node from datanodeMap. */
public void wipeDatanode(final DatanodeID node) throws IOException {
private void wipeDatanode(final DatanodeID node) throws IOException {
final String key = node.getStorageID();
synchronized (namesystem.datanodeMap) {
host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
@ -123,4 +161,380 @@ public void wipeDatanode(final DatanodeID node) throws IOException {
+ " is removed from datanodeMap.");
}
}
/* Resolve a node's network location */
private void resolveNetworkLocation (DatanodeDescriptor node) {
List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
// get the node's IP address
names.add(node.getHost());
} else {
// get the node's host name
String hostName = node.getHostName();
int colon = hostName.indexOf(":");
hostName = (colon==-1)?hostName:hostName.substring(0,colon);
names.add(hostName);
}
// resolve its network location
List<String> rName = dnsToSwitchMapping.resolve(names);
String networkLocation;
if (rName == null) {
LOG.error("The resolve call returned null! Using " +
NetworkTopology.DEFAULT_RACK + " for host " + names);
networkLocation = NetworkTopology.DEFAULT_RACK;
} else {
networkLocation = rName.get(0);
}
node.setNetworkLocation(networkLocation);
}
private boolean inHostsList(DatanodeID node, String ipAddr) {
return checkInList(node, ipAddr, hostsReader.getHosts(), false);
}
private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
return checkInList(node, ipAddr, hostsReader.getExcludedHosts(), true);
}
/**
* Remove an already decommissioned data node who is neither in include nor
* exclude hosts lists from the the list of live or dead nodes. This is used
* to not display an already decommssioned data node to the operators.
* The operation procedure of making a already decommissioned data node not
* to be displayed is as following:
* <ol>
* <li>
* Host must have been in the include hosts list and the include hosts list
* must not be empty.
* </li>
* <li>
* Host is decommissioned by remaining in the include hosts list and added
* into the exclude hosts list. Name node is updated with the new
* information by issuing dfsadmin -refreshNodes command.
* </li>
* <li>
* Host is removed from both include hosts and exclude hosts lists. Name
* node is updated with the new informationby issuing dfsamin -refreshNodes
* command.
* <li>
* </ol>
*
* @param nodeList
* , array list of live or dead nodes.
*/
public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
// If the include list is empty, any nodes are welcomed and it does not
// make sense to exclude any nodes from the cluster. Therefore, no remove.
if (hostsReader.getHosts().isEmpty()) {
return;
}
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
&& node.isDecommissioned()) {
// Include list is not empty, an existing datanode does not appear
// in both include or exclude lists and it has been decommissioned.
// Remove it from the node list.
it.remove();
}
}
}
/**
* Check if the given node (of DatanodeID or ipAddress) is in the (include or
* exclude) list. If ipAddress in null, check only based upon the given
* DatanodeID. If ipAddress is not null, the ipAddress should refers to the
* same host that given DatanodeID refers to.
*
* @param node, the host DatanodeID
* @param ipAddress, if not null, should refers to the same host
* that DatanodeID refers to
* @param hostsList, the list of hosts in the include/exclude file
* @param isExcludeList, boolean, true if this is the exclude list
* @return boolean, if in the list
*/
private static boolean checkInList(final DatanodeID node,
final String ipAddress,
final Set<String> hostsList,
final boolean isExcludeList) {
final InetAddress iaddr;
if (ipAddress != null) {
try {
iaddr = InetAddress.getByName(ipAddress);
} catch (UnknownHostException e) {
LOG.warn("Unknown ip address: " + ipAddress, e);
return isExcludeList;
}
} else {
try {
iaddr = InetAddress.getByName(node.getHost());
} catch (UnknownHostException e) {
LOG.warn("Unknown host: " + node.getHost(), e);
return isExcludeList;
}
}
// if include list is empty, host is in include list
if ( (!isExcludeList) && (hostsList.isEmpty()) ){
return true;
}
return // compare ipaddress(:port)
(hostsList.contains(iaddr.getHostAddress().toString()))
|| (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+ node.getPort()))
// compare hostname(:port)
|| (hostsList.contains(iaddr.getHostName()))
|| (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
|| ((node instanceof DatanodeInfo) && hostsList
.contains(((DatanodeInfo) node).getHostName()));
}
/**
* Decommission the node if it is in exclude list.
*/
private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
throws IOException {
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg, ipAddr)) {
namesystem.getBlockManager().startDecommission(nodeReg);
}
}
/**
* Generate new storage ID.
*
* @return unique storage ID
*
* Note: that collisions are still possible if somebody will try
* to bring in a data storage from a different cluster.
*/
private String newStorageID() {
String newID = null;
while(newID == null) {
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
if (namesystem.datanodeMap.get(newID) != null)
newID = null;
}
return newID;
}
public void registerDatanode(DatanodeRegistration nodeReg
) throws IOException {
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
// But if not, use address passed by the data-node.
dnAddress = nodeReg.getHost();
}
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
if (!inHostsList(nodeReg, dnAddress)) {
throw new DisallowedDatanodeException(nodeReg);
}
String hostName = nodeReg.getHost();
// update the datanode's name with ip:port
DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
nodeReg.getStorageID(),
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
nodeReg.exportedKeys = namesystem.getBlockKeys();
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+ "node registration from " + nodeReg.getName()
+ " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+ "node from name: " + nodeN.getName());
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
namesystem.removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
}
if (nodeS != null) {
if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+ "node restarted.");
}
} else {
// nodeS is found
/* The registering datanode is a replacement node for the existing
data storage, which from now on will be served by a new node.
If this message repeats, both nodes might have same storageID
by (insanely rare) random chance. User needs to restart one of the
nodes with its data cleared (or user can just remove the StorageID
value in "VERSION" file under the data directory of the datanode,
but this is might not work if VERSION file format has changed
*/
NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+ "node " + nodeS.getName()
+ " is replaced by " + nodeReg.getName() +
" with the same storageID " +
nodeReg.getStorageID());
}
// update cluster map
getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setHostName(hostName);
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
synchronized(namesystem.heartbeats) {
if( !namesystem.heartbeats.contains(nodeS)) {
namesystem.heartbeats.add(nodeS);
//update its timestamp
nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
nodeS.isAlive = true;
}
}
checkDecommissioning(nodeS, dnAddress);
return;
}
// this is a new datanode serving a new data storage
if (nodeReg.getStorageID().equals("")) {
// this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS
nodeReg.storageID = newStorageID();
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
+ "new storageID " + nodeReg.getStorageID() + " assigned.");
}
}
// register new datanode
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
resolveNetworkLocation(nodeDescr);
addDatanode(nodeDescr);
checkDecommissioning(nodeDescr, dnAddress);
// also treat the registration message as a heartbeat
synchronized(namesystem.heartbeats) {
namesystem.heartbeats.add(nodeDescr);
nodeDescr.isAlive = true;
// no need to update its timestamp
// because its is done when the descriptor is created
}
}
/** Reread include/exclude files. */
public void refreshHostsReader(Configuration conf) throws IOException {
// Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list.
if (conf == null) {
conf = new HdfsConfiguration();
}
hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
hostsReader.refresh();
}
/**
* Rereads the config to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
* checks if any of the hosts have changed states:
* 1. Added to hosts --> no further work needed here.
* 2. Removed from hosts --> mark AdminState as decommissioned.
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
public void refreshDatanodes() throws IOException {
for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node, null)) {
node.setDisallowed(true); // case 2.
} else {
if (inExcludedHostsList(node, null)) {
namesystem.getBlockManager().startDecommission(node); // case 3.
} else {
namesystem.getBlockManager().stopDecommission(node); // case 4.
}
}
}
}
/** For generating datanode reports */
public List<DatanodeDescriptor> getDatanodeListForReport(
final DatanodeReportType type) {
boolean listLiveNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.LIVE;
boolean listDeadNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.DEAD;
HashMap<String, String> mustList = new HashMap<String, String>();
if (listDeadNodes) {
//first load all the nodes listed in include and exclude files.
Iterator<String> it = hostsReader.getHosts().iterator();
while (it.hasNext()) {
mustList.put(it.next(), "");
}
it = hostsReader.getExcludedHosts().iterator();
while (it.hasNext()) {
mustList.put(it.next(), "");
}
}
ArrayList<DatanodeDescriptor> nodes = null;
synchronized (namesystem.datanodeMap) {
nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() +
mustList.size());
Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn = it.next();
boolean isDead = namesystem.isDatanodeDead(dn);
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
nodes.add(dn);
}
//Remove any form of the this datanode in include/exclude lists.
try {
InetAddress inet = InetAddress.getByName(dn.getHost());
// compare hostname(:port)
mustList.remove(inet.getHostName());
mustList.remove(inet.getHostName()+":"+dn.getPort());
// compare ipaddress(:port)
mustList.remove(inet.getHostAddress().toString());
mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
} catch ( UnknownHostException e ) {
mustList.remove(dn.getName());
mustList.remove(dn.getHost());
LOG.warn(e);
}
}
}
if (listDeadNodes) {
Iterator<String> it = mustList.keySet().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn =
new DatanodeDescriptor(new DatanodeID(it.next()));
dn.setLastUpdate(0);
nodes.add(dn);
}
}
return nodes;
}
}

View File

@ -35,9 +35,11 @@ class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
private final FSNamesystem fsnamesystem;
private final BlockManager blockmanager;
DecommissionManager(FSNamesystem namesystem) {
this.fsnamesystem = namesystem;
this.blockmanager = fsnamesystem.getBlockManager();
}
/** Periodically check decommission status. */
@ -88,7 +90,7 @@ private void check() {
if (d.isDecommissionInProgress()) {
try {
fsnamesystem.checkDecommissionStateInternal(d);
blockmanager.checkDecommissionStateInternal(d);
} catch(Exception e) {
LOG.warn("entry=" + entry, e);
}

View File

@ -31,7 +31,6 @@
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -102,6 +101,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@ -133,11 +133,8 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -145,8 +142,6 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
@ -313,10 +308,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information
private DNSToSwitchMapping dnsToSwitchMapping;
private HostsFileReader hostsReader;
private long maxFsObjects = 0; // maximum number of fs objects
@ -376,9 +367,6 @@ private void initialize(Configuration conf, FSImage fsImage)
this.dir = new FSDirectory(fsImage, this, conf);
}
this.safeMode = new SafeModeInfo(conf);
this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS,""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE,""));
if (isBlockTokenEnabled) {
blockTokenSecretManager = new BlockTokenSecretManager(true,
blockKeyUpdateInterval, blockTokenLifetime);
@ -407,19 +395,6 @@ void activate(Configuration conf) throws IOException {
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
/* If the dns to switch mapping supports cache, resolve network
* locations of those hosts in the include list,
* and store the mapping in the cache; so future calls to resolve
* will be fast.
*/
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
}
@ -768,7 +743,7 @@ BlocksWithLocations getBlocks(DatanodeID datanode, long size)
*
* @return current access keys
*/
ExportedBlockKeys getBlockKeys() {
public ExportedBlockKeys getBlockKeys() {
return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
: ExportedBlockKeys.DUMMY_KEYS;
}
@ -1836,8 +1811,8 @@ LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
}
// choose new datanodes.
final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
src, numAdditionalNodes, clientnode, chosen, true,
final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
excludes, preferredblocksize);
final LocatedBlock lb = new LocatedBlock(blk, targets);
if (isBlockTokenEnabled) {
@ -2780,162 +2755,12 @@ public void registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
writeLock();
try {
registerDatanodeInternal(nodeReg);
getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
checkSafeMode();
} finally {
writeUnlock();
}
}
/** @see #registerDatanode(DatanodeRegistration) */
public void registerDatanodeInternal(DatanodeRegistration nodeReg)
throws IOException {
assert hasWriteLock();
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
// But if not, use address passed by the data-node.
dnAddress = nodeReg.getHost();
}
// check if the datanode is allowed to be connect to the namenode
if (!verifyNodeRegistration(nodeReg, dnAddress)) {
throw new DisallowedDatanodeException(nodeReg);
}
String hostName = nodeReg.getHost();
// update the datanode's name with ip:port
DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
nodeReg.getStorageID(),
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
nodeReg.exportedKeys = getBlockKeys();
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.registerDatanode: "
+ "node registration from " + nodeReg.getName()
+ " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN =
blockManager.getDatanodeManager().getDatanodeByHost(nodeReg.getName());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+ "node from name: " + nodeN.getName());
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
removeDatanode(nodeN);
// physically remove node from datanodeMap
blockManager.getDatanodeManager().wipeDatanode(nodeN);
nodeN = null;
}
if (nodeS != null) {
if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+ "node restarted.");
}
} else {
// nodeS is found
/* The registering datanode is a replacement node for the existing
data storage, which from now on will be served by a new node.
If this message repeats, both nodes might have same storageID
by (insanely rare) random chance. User needs to restart one of the
nodes with its data cleared (or user can just remove the StorageID
value in "VERSION" file under the data directory of the datanode,
but this is might not work if VERSION file format has changed
*/
NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+ "node " + nodeS.getName()
+ " is replaced by " + nodeReg.getName() +
" with the same storageID " +
nodeReg.getStorageID());
}
// update cluster map
blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setHostName(hostName);
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
synchronized(heartbeats) {
if( !heartbeats.contains(nodeS)) {
heartbeats.add(nodeS);
//update its timestamp
nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
nodeS.isAlive = true;
}
}
checkDecommissioning(nodeS, dnAddress);
return;
}
// this is a new datanode serving a new data storage
if (nodeReg.getStorageID().equals("")) {
// this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS
nodeReg.storageID = newStorageID();
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
+ "new storageID " + nodeReg.getStorageID() + " assigned.");
}
}
// register new datanode
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
resolveNetworkLocation(nodeDescr);
blockManager.getDatanodeManager().addDatanode(nodeDescr);
checkDecommissioning(nodeDescr, dnAddress);
// also treat the registration message as a heartbeat
synchronized(heartbeats) {
heartbeats.add(nodeDescr);
nodeDescr.isAlive = true;
// no need to update its timestamp
// because its is done when the descriptor is created
}
checkSafeMode();
}
/* Resolve a node's network location */
private void resolveNetworkLocation (DatanodeDescriptor node) {
assert hasWriteLock();
List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
// get the node's IP address
names.add(node.getHost());
} else {
// get the node's host name
String hostName = node.getHostName();
int colon = hostName.indexOf(":");
hostName = (colon==-1)?hostName:hostName.substring(0,colon);
names.add(hostName);
}
// resolve its network location
List<String> rName = dnsToSwitchMapping.resolve(names);
String networkLocation;
if (rName == null) {
LOG.error("The resolve call returned null! Using " +
NetworkTopology.DEFAULT_RACK + " for host " + names);
networkLocation = NetworkTopology.DEFAULT_RACK;
} else {
networkLocation = rName.get(0);
}
node.setNetworkLocation(networkLocation);
}
/**
* Get registrationID for datanodes based on the namespaceID.
@ -2946,26 +2771,8 @@ private void resolveNetworkLocation (DatanodeDescriptor node) {
public String getRegistrationID() {
return Storage.getRegistrationID(dir.fsImage.getStorage());
}
/**
* Generate new storage ID.
*
* @return unique storage ID
*
* Note: that collisions are still possible if somebody will try
* to bring in a data storage from a different cluster.
*/
private String newStorageID() {
String newID = null;
while(newID == null) {
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
if (datanodeMap.get(newID) != null)
newID = null;
}
return newID;
}
private boolean isDatanodeDead(DatanodeDescriptor node) {
public boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(now() - heartbeatExpireInterval));
}
@ -3078,7 +2885,7 @@ DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
return null;
}
private void updateStats(DatanodeDescriptor node, boolean isAdded) {
public void updateStats(DatanodeDescriptor node, boolean isAdded) {
//
// The statistics are protected by the heartbeat lock
// For decommissioning/decommissioned nodes, only used capacity
@ -3280,10 +3087,10 @@ public void setNodeReplicationLimit(int limit) {
/**
* Remove a datanode descriptor.
* @param nodeID datanode ID.
* @throws IOException
* @throws UnregisteredNodeException
*/
public void removeDatanode(DatanodeID nodeID)
throws IOException {
public void removeDatanode(final DatanodeID nodeID
) throws UnregisteredNodeException {
writeLock();
try {
DatanodeDescriptor nodeInfo = getDatanode(nodeID);
@ -3664,83 +3471,23 @@ public int getTotalLoad() {
}
int getNumberOfDatanodes(DatanodeReportType type) {
return getDatanodeListForReport(type).size();
}
private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
DatanodeReportType type) {
readLock();
try {
boolean listLiveNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.LIVE;
boolean listDeadNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.DEAD;
HashMap<String, String> mustList = new HashMap<String, String>();
if (listDeadNodes) {
//first load all the nodes listed in include and exclude files.
Iterator<String> it = hostsReader.getHosts().iterator();
while (it.hasNext()) {
mustList.put(it.next(), "");
}
it = hostsReader.getExcludedHosts().iterator();
while (it.hasNext()) {
mustList.put(it.next(), "");
}
}
ArrayList<DatanodeDescriptor> nodes = null;
synchronized (datanodeMap) {
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
mustList.size());
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn = it.next();
boolean isDead = isDatanodeDead(dn);
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
nodes.add(dn);
}
//Remove any form of the this datanode in include/exclude lists.
try {
InetAddress inet = InetAddress.getByName(dn.getHost());
// compare hostname(:port)
mustList.remove(inet.getHostName());
mustList.remove(inet.getHostName()+":"+dn.getPort());
// compare ipaddress(:port)
mustList.remove(inet.getHostAddress().toString());
mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
} catch ( UnknownHostException e ) {
mustList.remove(dn.getName());
mustList.remove(dn.getHost());
LOG.warn(e);
}
}
}
if (listDeadNodes) {
Iterator<String> it = mustList.keySet().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn =
new DatanodeDescriptor(new DatanodeID(it.next()));
dn.setLastUpdate(0);
nodes.add(dn);
}
}
return nodes;
try {
return getBlockManager().getDatanodeManager().getDatanodeListForReport(
type).size();
} finally {
readUnlock();
}
}
public DatanodeInfo[] datanodeReport( DatanodeReportType type)
throws AccessControlException {
DatanodeInfo[] datanodeReport(final DatanodeReportType type
) throws AccessControlException {
checkSuperuserPrivilege();
readLock();
try {
checkSuperuserPrivilege();
ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
final DatanodeManager dm = getBlockManager().getDatanodeManager();
final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
DatanodeInfo[] arr = new DatanodeInfo[results.size()];
for (int i=0; i<arr.length; i++) {
arr[i] = new DatanodeInfo(results.get(i));
@ -3804,8 +3551,8 @@ public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
ArrayList<DatanodeDescriptor> dead) {
readLock();
try {
ArrayList<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
final List<DatanodeDescriptor> results = getBlockManager(
).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);
for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (isDatanodeDead(node))
@ -3836,44 +3583,6 @@ private void datanodeDump(PrintWriter out) {
}
}
/**
* Start decommissioning the specified datanode.
*/
private void startDecommission(DatanodeDescriptor node)
throws IOException {
assert hasWriteLock();
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName() + " with " +
node.numBlocks() + " blocks.");
synchronized (heartbeats) {
updateStats(node, false);
node.startDecommission();
updateStats(node, true);
}
node.decommissioningStatus.setStartTime(now());
// all the blocks that reside on this node have to be replicated.
checkDecommissionStateInternal(node);
}
}
/**
* Stop decommissioning the specified datanodes.
*/
public void stopDecommission(DatanodeDescriptor node)
throws IOException {
assert hasWriteLock();
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node.getName());
synchronized (heartbeats) {
updateStats(node, false);
node.stopDecommission();
updateStats(node, true);
}
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
}
public Date getStartTime() {
return new Date(systemStart);
}
@ -3898,85 +3607,6 @@ short adjustReplication(short replication) {
return replication;
}
/**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
public boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
assert hasWriteLock();
//
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
//
if (node.isDecommissionInProgress()) {
if (!blockManager.isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for node " + node.getName());
}
}
return node.isDecommissioned();
}
/**
* Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
*/
private boolean inHostsList(DatanodeID node, String ipAddr) {
Set<String> hostsList = hostsReader.getHosts();
return checkInList(node, ipAddr, hostsList, false);
}
private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
Set<String> excludeList = hostsReader.getExcludedHosts();
return checkInList(node, ipAddr, excludeList, true);
}
/**
* Check if the given node (of DatanodeID or ipAddress) is in the (include or
* exclude) list. If ipAddress in null, check only based upon the given
* DatanodeID. If ipAddress is not null, the ipAddress should refers to the
* same host that given DatanodeID refers to.
*
* @param node, DatanodeID, the host DatanodeID
* @param ipAddress, if not null, should refers to the same host
* that DatanodeID refers to
* @param hostsList, the list of hosts in the include/exclude file
* @param isExcludeList, boolean, true if this is the exclude list
* @return boolean, if in the list
*/
private boolean checkInList(DatanodeID node, String ipAddress,
Set<String> hostsList, boolean isExcludeList) {
InetAddress iaddr = null;
try {
if (ipAddress != null) {
iaddr = InetAddress.getByName(ipAddress);
} else {
iaddr = InetAddress.getByName(node.getHost());
}
}catch (UnknownHostException e) {
LOG.warn("Unknown host in host list: "+ipAddress);
// can't resolve the host name.
if (isExcludeList){
return true;
} else {
return false;
}
}
// if include list is empty, host is in include list
if ( (!isExcludeList) && (hostsList.isEmpty()) ){
return true;
}
return // compare ipaddress(:port)
(hostsList.contains(iaddr.getHostAddress().toString()))
|| (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+ node.getPort()))
// compare hostname(:port)
|| (hostsList.contains(iaddr.getHostName()))
|| (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
|| ((node instanceof DatanodeInfo) && hostsList
.contains(((DatanodeInfo) node).getHostName()));
}
/**
* Rereads the config to get hosts and exclude list file names.
@ -3989,29 +3619,10 @@ private boolean checkInList(DatanodeID node, String ipAddress,
*/
public void refreshNodes(Configuration conf) throws IOException {
checkSuperuserPrivilege();
// Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list
if (conf == null)
conf = new HdfsConfiguration();
hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS,""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
hostsReader.refresh();
getBlockManager().getDatanodeManager().refreshHostsReader(conf);
writeLock();
try {
for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
it.hasNext();) {
DatanodeDescriptor node = it.next();
// Check if not include.
if (!inHostsList(node, null)) {
node.setDisallowed(true); // case 2.
} else {
if (inExcludedHostsList(node, null)) {
startDecommission(node); // case 3.
} else {
stopDecommission(node); // case 4.
}
}
}
getBlockManager().getDatanodeManager().refreshDatanodes();
} finally {
writeUnlock();
}
@ -4021,27 +3632,7 @@ void finalizeUpgrade() throws IOException {
checkSuperuserPrivilege();
getFSImage().finalizeUpgrade();
}
/**
* Checks if the node is not on the hosts list. If it is not, then
* it will be disallowed from registering.
*/
private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
assert hasWriteLock();
return inHostsList(nodeReg, ipAddr);
}
/**
* Decommission the node if it is in exclude list.
*/
private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
throws IOException {
assert hasWriteLock();
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg, ipAddr)) {
startDecommission(nodeReg);
}
}
/**
* Get data node by storage ID.
@ -4050,7 +3641,8 @@ private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
* @return DatanodeDescriptor or null if the node is not found.
* @throws IOException
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
public DatanodeDescriptor getDatanode(DatanodeID nodeID
) throws UnregisteredNodeException {
assert hasReadOrWriteLock();
UnregisteredNodeException e = null;
DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
@ -5411,8 +5003,8 @@ public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
try {
ArrayList<DatanodeDescriptor> decommissioningNodes =
new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.LIVE);
final List<DatanodeDescriptor> results = getBlockManager(
).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (node.isDecommissionInProgress()) {
@ -5824,49 +5416,8 @@ public String getBlockPoolId() {
public BlockManager getBlockManager() {
return blockManager;
}
/**
* Remove an already decommissioned data node who is neither in include nor
* exclude hosts lists from the the list of live or dead nodes. This is used
* to not display an already decommssioned data node to the operators.
* The operation procedure of making a already decommissioned data node not
* to be displayed is as following:
* <ol>
* <li>
* Host must have been in the include hosts list and the include hosts list
* must not be empty.
* </li>
* <li>
* Host is decommissioned by remaining in the include hosts list and added
* into the exclude hosts list. Name node is updated with the new
* information by issuing dfsadmin -refreshNodes command.
* </li>
* <li>
* Host is removed from both include hosts and exclude hosts lists. Name
* node is updated with the new informationby issuing dfsamin -refreshNodes
* command.
* <li>
* </ol>
*
* @param nodeList
* , array list of live or dead nodes.
*/
void removeDecomNodeFromList(ArrayList<DatanodeDescriptor> nodeList) {
// If the include list is empty, any nodes are welcomed and it does not
// make sense to exclude any nodes from the cluster. Therefore, no remove.
if (hostsReader.getHosts().isEmpty()) {
return;
}
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
&& node.isDecommissioned()) {
// Include list is not empty, an existing datanode does not appear
// in both include or exclude lists and it has been decommissioned.
// Remove it from the node list.
it.remove();
}
}
void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
}
}

View File

@ -59,13 +59,13 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
@ -282,7 +282,8 @@ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
do {
Thread.sleep(1000);
int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b.getLocalBlock());
int[] r = BlockManagerTestUtil.getReplicaInfo(cluster.getNamesystem(),
b.getLocalBlock());
curRacks = r[0];
curReplicas = r[1];
curNeededReplicas = r[2];

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
public class BlockManagerTestUtil {
/**
* @return a tuple of the replica state (number racks, number live
* replicas, and number needed replicas) for the given block.
*/
public static int[] getReplicaInfo(final FSNamesystem namesystem, final Block b) {
final BlockManager bm = namesystem.getBlockManager();
namesystem.readLock();
try {
return new int[]{getNumberOfRacks(bm, b),
bm.countNodes(b).liveReplicas(),
bm.neededReplications.contains(b) ? 1 : 0};
} finally {
namesystem.readUnlock();
}
}
/**
* @return the number of racks over which a given block is replicated
* decommissioning/decommissioned nodes are not counted. corrupt replicas
* are also ignored
*/
private static int getNumberOfRacks(final BlockManager blockmanager,
final Block b) {
final Set<String> rackSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes =
blockmanager.corruptReplicas.getNodes(b);
for (Iterator<DatanodeDescriptor> it = blockmanager.blocksMap.nodeIterator(b);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
String rackName = cur.getNetworkLocation();
if (!rackSet.contains(rackName)) {
rackSet.add(rackName);
}
}
}
}
return rackSet.size();
}
}

View File

@ -68,7 +68,7 @@ public class TestReplicationPolicy extends TestCase {
throw (RuntimeException)new RuntimeException().initCause(e);
}
final BlockManager bm = namenode.getNamesystem().getBlockManager();
replicator = bm.replicator;
replicator = bm.getBlockPlacementPolicy();
cluster = bm.getDatanodeManager().getNetworkTopology();
// construct network topology
for(int i=0; i<NUM_OF_DATANODES; i++) {

View File

@ -60,21 +60,6 @@ public static void refreshBlockCounts(NameNode namenode) {
public static Server getRpcServer(NameNode namenode) {
return namenode.server;
}
/**
* Return a tuple of the replica state (number racks, number live
* replicas, and number needed replicas) for the given block.
* @param namenode to proxy the invocation to.
*/
public static int[] getReplicaInfo(NameNode namenode, Block b) {
FSNamesystem ns = namenode.getNamesystem();
ns.readLock();
int[] r = {ns.blockManager.getNumberOfRacks(b),
ns.blockManager.countNodes(b).liveReplicas(),
ns.blockManager.neededReplications.contains(b) ? 1 : 0};
ns.readUnlock();
return r;
}
public static String getLeaseHolderForPath(NameNode namenode, String path) {
return namenode.getNamesystem().leaseManager.getLeaseByPath(path).getHolder();