HDFS-2108. Move datanode heartbeat handling from namenode package to blockmanagement package.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1154042 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-08-04 22:55:48 +00:00
parent 23762da4fa
commit 7fac946ac9
20 changed files with 615 additions and 436 deletions

View File

@ -635,6 +635,9 @@ Trunk (unreleased changes)
HDFS-2225. Refactor file management so it's not in classes which should
be generic. (Ivan Kelly via todd)
HDFS-2108. Move datanode heartbeat handling from namenode package to
blockmanagement package. (szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -565,7 +565,6 @@ public void renewLease(String clientName) throws AccessControlException,
* <li> [3] contains number of under replicated blocks in the system.</li>
* <li> [4] contains number of blocks with a corrupt replica. </li>
* <li> [5] contains number of blocks without any good replicas left. </li>
* <li> [5] contains number of blocks without any good replicas left. </li>
* <li> [6] contains the total used space of the block pool. </li>
* </ul>
* Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
@ -46,6 +44,7 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -95,11 +94,6 @@ public boolean isBlockTokenEnabled() {
return isBlockTokenEnabled;
}
/** get the block key update interval */
public long getBlockKeyUpdateInterval() {
return blockKeyUpdateInterval;
}
/** get the BlockTokenSecretManager */
public BlockTokenSecretManager getBlockTokenSecretManager() {
return blockTokenSecretManager;
@ -140,7 +134,8 @@ public long getExcessBlocksCount() {
public final BlocksMap blocksMap;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
@ -177,7 +172,7 @@ public long getExcessBlocksCount() {
/** The maximum number of outgoing replication streams
* a given node should have at one time
*/
public int maxReplicationStreams;
int maxReplicationStreams;
/** Minimum copies needed or else write is disallowed */
public final int minReplication;
/** Default number of replicas */
@ -217,22 +212,12 @@ public void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException
setBlockToken(l);
}
}
/**
* Update access keys.
*/
public void updateBlockKey() throws IOException {
this.blockTokenSecretManager.updateKeys();
synchronized (namesystem.heartbeats) {
for (DatanodeDescriptor nodeInfo : namesystem.heartbeats) {
nodeInfo.needKeyUpdate = true;
}
}
}
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
namesystem = fsn;
datanodeManager = new DatanodeManager(fsn, conf);
datanodeManager = new DatanodeManager(this, fsn, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance(
conf, namesystem, datanodeManager.getNetworkTopology());
@ -387,6 +372,11 @@ public void metaSave(PrintWriter out) {
getDatanodeManager().datanodeDump(out);
}
/** @return maxReplicationStreams */
public int getMaxReplicationStreams() {
return maxReplicationStreams;
}
/**
* @param block
* @return true if the block has minimum replicas
@ -587,7 +577,8 @@ public LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
}
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
return namesystem.createLocatedBlock(uc, locations, pos, false);
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, locations, pos, false);
}
// get block locations
@ -613,7 +604,8 @@ public LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
machines[j++] = d;
}
}
return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, machines, pos, isCorrupt);
}
/**
@ -685,8 +677,8 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
}
/** Remove a datanode. */
public void removeDatanode(final DatanodeDescriptor node) {
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
@ -694,11 +686,6 @@ public void removeDatanode(final DatanodeDescriptor node) {
node.resetBlocks();
removeFromInvalidates(node.getStorageID());
datanodeManager.getNetworkTopology().remove(node);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + node.getName());
}
}
private void removeFromInvalidates(String storageID, Block block) {
@ -887,7 +874,7 @@ public int getUnderReplicatedNotMissingBlocks() {
* @param nodesToProcess number of datanodes to schedule deletion work
* @return total number of block for deletion
*/
public int computeInvalidateWork(int nodesToProcess) {
int computeInvalidateWork(int nodesToProcess) {
int numOfNodes = recentInvalidateSets.size();
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
@ -927,7 +914,7 @@ public int computeInvalidateWork(int nodesToProcess) {
*
* @return number of blocks scheduled for replication during this iteration.
*/
public int computeReplicationWork(int blocksToProcess) throws IOException {
private int computeReplicationWork(int blocksToProcess) throws IOException {
// Choose the blocks to be replicated
List<List<Block>> blocksToReplicate =
chooseUnderReplicatedBlocks(blocksToProcess);
@ -2047,7 +2034,7 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
* On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock()
*/
private void processOverReplicatedBlocksOnReCommission(
void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
@ -2145,6 +2132,16 @@ public BlockInfo getStoredBlock(Block block) {
return blocksMap.getStoredBlock(block);
}
/** Should the access keys be updated? */
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
final boolean b = isBlockTokenEnabled && blockKeyUpdateInterval < updateTime;
if (b) {
blockTokenSecretManager.updateKeys();
}
return b;
}
/* updates a block in under replication queue */
public void updateNeededReplications(Block block, int curReplicasDelta,
int expectedReplicasDelta) {
@ -2355,58 +2352,12 @@ public BlockIterator getCorruptReplicaBlockIterator() {
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
/**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
if (node.isDecommissionInProgress()) {
if (!isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for node " + node.getName());
}
}
return node.isDecommissioned();
}
/** Start decommissioning the specified datanode. */
void startDecommission(DatanodeDescriptor node) throws IOException {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName() + " with " +
node.numBlocks() + " blocks.");
synchronized (namesystem.heartbeats) {
namesystem.updateStats(node, false);
node.startDecommission();
namesystem.updateStats(node, true);
}
node.decommissioningStatus.setStartTime(now());
// all the blocks that reside on this node have to be replicated.
checkDecommissionStateInternal(node);
}
}
/** Stop decommissioning the specified datanodes. */
void stopDecommission(DatanodeDescriptor node) throws IOException {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node.getName());
synchronized (namesystem.heartbeats) {
namesystem.updateStats(node, false);
node.stopDecommission();
namesystem.updateStats(node, true);
}
processOverReplicatedBlocksOnReCommission(node);
}
}
/**
* Periodically calls computeReplicationWork().
*/
private class ReplicationMonitor implements Runnable {
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
private static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
private static final int REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
@Override
public void run() {
@ -2439,8 +2390,6 @@ public void run() {
*/
int computeDatanodeWork() throws IOException {
int workFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
// Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get
@ -2448,11 +2397,11 @@ int computeDatanodeWork() throws IOException {
if (namesystem.isInSafeMode())
return workFound;
synchronized (namesystem.heartbeats) {
blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
}
final int numlive = heartbeatManager.getLiveDatanodeCount();
final int blocksToProcess = numlive
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
final int nodesToProcess = (int) Math.ceil(numlive
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
workFound = this.computeReplicationWork(blocksToProcess);

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
@ -49,6 +51,7 @@
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@ -56,7 +59,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
@ -75,7 +77,10 @@
public class DatanodeManager {
static final Log LOG = LogFactory.getLog(DatanodeManager.class);
final FSNamesystem namesystem;
private final FSNamesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
/**
* Stores the datanode -> block map.
@ -117,9 +122,14 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
DatanodeManager(final FSNamesystem namesystem, final Configuration conf
DatanodeManager(final BlockManager blockManager,
final FSNamesystem namesystem, final Configuration conf
) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.heartbeatManager = new HeartbeatManager(namesystem, conf);
this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
@ -158,17 +168,30 @@ void activate(final Configuration conf) {
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
decommissionthread.start();
heartbeatManager.activate(conf);
}
void close() {
if (decommissionthread != null) decommissionthread.interrupt();
heartbeatManager.close();
}
/** @return the network topology. */
public NetworkTopology getNetworkTopology() {
return networktopology;
}
/** @return the heartbeat manager. */
HeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}
/** @return the datanode statistics. */
public DatanodeStatistics getDatanodeStatistics() {
return heartbeatManager;
}
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) {
@ -231,9 +254,44 @@ void datanodeDump(final PrintWriter out) {
}
}
/**
* Remove a datanode descriptor.
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
assert namesystem.hasWriteLock();
heartbeatManager.removeDatanode(nodeInfo);
blockManager.removeBlocksAssociatedTo(nodeInfo);
networktopology.remove(nodeInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo.getName());
}
namesystem.checkSafeMode();
}
/**
* Remove a datanode
* @throws UnregisteredNodeException
*/
public void removeDatanode(final DatanodeID node
) throws UnregisteredNodeException {
namesystem.writeLock();
try {
final DatanodeDescriptor descriptor = getDatanode(node);
if (descriptor != null) {
removeDatanode(descriptor);
} else {
NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+ node.getName() + " does not exist");
}
} finally {
namesystem.writeUnlock();
}
}
/** Remove a dead datanode. */
public void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(namesystem.heartbeats) {
void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(datanodeMap) {
DatanodeDescriptor d;
try {
@ -244,14 +302,13 @@ public void removeDeadDatanode(final DatanodeID nodeID) {
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
namesystem.removeDatanode(d);
removeDatanode(d);
}
}
}
}
/** Is the datanode dead? */
public boolean isDatanodeDead(DatanodeDescriptor node) {
boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(Util.now() - heartbeatExpireInterval));
}
@ -423,11 +480,48 @@ private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
throws IOException {
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg, ipAddr)) {
namesystem.getBlockManager().startDecommission(nodeReg);
startDecommission(nodeReg);
}
}
/**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
boolean checkDecommissionState(DatanodeDescriptor node) {
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
if (node.isDecommissionInProgress()) {
if (!blockManager.isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for node " + node.getName());
}
}
return node.isDecommissioned();
}
/** Start decommissioning the specified datanode. */
private void startDecommission(DatanodeDescriptor node) throws IOException {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName() + " with " +
node.numBlocks() + " blocks.");
heartbeatManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
// all the blocks that reside on this node have to be replicated.
checkDecommissionState(node);
}
}
/** Stop decommissioning the specified datanodes. */
void stopDecommission(DatanodeDescriptor node) throws IOException {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node.getName());
heartbeatManager.stopDecommission(node);
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
}
/**
* Generate new storage ID.
*
@ -483,7 +577,7 @@ public void registerDatanode(DatanodeRegistration nodeReg
+ "node from name: " + nodeN.getName());
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
namesystem.removeDatanode(nodeN);
removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
@ -525,14 +619,7 @@ nodes with its data cleared (or user can just remove the StorageID
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
synchronized(namesystem.heartbeats) {
if( !namesystem.heartbeats.contains(nodeS)) {
namesystem.heartbeats.add(nodeS);
//update its timestamp
nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
nodeS.isAlive = true;
}
}
heartbeatManager.register(nodeS);
checkDecommissioning(nodeS, dnAddress);
return;
}
@ -556,12 +643,9 @@ nodes with its data cleared (or user can just remove the StorageID
checkDecommissioning(nodeDescr, dnAddress);
// also treat the registration message as a heartbeat
synchronized(namesystem.heartbeats) {
namesystem.heartbeats.add(nodeDescr);
nodeDescr.isAlive = true;
// no need to update its timestamp
// because its is done when the descriptor is created
}
// no need to update its timestamp
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
}
/** Reread include/exclude files. */
@ -589,12 +673,12 @@ public void refreshDatanodes() throws IOException {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node, null)) {
node.setDisallowed(true); // case 2.
node.setDisallowed(true); // case 2.
} else {
if (inExcludedHostsList(node, null)) {
namesystem.getBlockManager().startDecommission(node); // case 3.
startDecommission(node); // case 3.
} else {
namesystem.getBlockManager().stopDecommission(node); // case 4.
stopDecommission(node); // case 4.
}
}
}
@ -712,7 +796,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException {
synchronized (namesystem.heartbeats) {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
@ -731,10 +815,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
namesystem.updateStats(nodeinfo, false);
nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, failedVolumes);
namesystem.updateStats(nodeinfo, true);
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
remaining, blockPoolUsed, xceiverCount, failedVolumes);
//check lease recovery
BlockInfoUnderConstruction[] blocks = nodeinfo

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
/** Datanode statistics */
public interface DatanodeStatistics {
/** @return the total capacity */
public long getCapacityTotal();
/** @return the used capacity */
public long getCapacityUsed();
/** @return the percentage of the used capacity over the total capacity. */
public float getCapacityUsedPercent();
/** @return the remaining capacity */
public long getCapacityRemaining();
/** @return the percentage of the remaining capacity over the total capacity. */
public float getCapacityRemainingPercent();
/** @return the block pool used. */
public long getBlockPoolUsed();
/** @return the percentage of the block pool used space over the total capacity. */
public float getPercentBlockPoolUsed();
/** @return the xceiver count */
public int getXceiverCount();
/**
* @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system
*/
public long getCapacityUsedNonDFS();
/** The same as {@link ClientProtocol#getStats()}.
* The block related entries are set to -1.
*/
public long[] getStats();
}

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.CyclicIteration;
/**
* Manage node decommissioning.
@ -35,11 +34,9 @@ class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
private final FSNamesystem fsnamesystem;
private final BlockManager blockManager;
DecommissionManager(FSNamesystem namesystem) {
DecommissionManager(final FSNamesystem namesystem) {
this.fsnamesystem = namesystem;
this.blockManager = fsnamesystem.getBlockManager();
}
/** Periodically check decommission status. */
@ -81,16 +78,16 @@ public void run() {
}
private void check() {
final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry
: blockManager.getDatanodeManager().getDatanodeCyclicIteration(
firstkey)) {
: dm.getDatanodeCyclicIteration(firstkey)) {
final DatanodeDescriptor d = entry.getValue();
firstkey = entry.getKey();
if (d.isDecommissionInProgress()) {
try {
blockManager.checkDecommissionStateInternal(d);
dm.checkDecommissionState(d);
} catch(Exception e) {
LOG.warn("entry=" + entry, e);
}

View File

@ -0,0 +1,301 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.Daemon;
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
* by the heartbeat manager lock.
*/
class HeartbeatManager implements DatanodeStatistics {
static final Log LOG = LogFactory.getLog(HeartbeatManager.class);
/**
* Stores a subset of the datanodeMap in DatanodeManager,
* containing nodes that are considered alive.
* The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
* It is synchronized by the heartbeat manager lock.
*/
private final List<DatanodeDescriptor> datanodes = new ArrayList<DatanodeDescriptor>();
/** Statistics, which are synchronized by the heartbeat manager lock. */
private final Stats stats = new Stats();
/** The time period to check for expired datanodes */
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
final FSNamesystem namesystem;
HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) {
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.namesystem = namesystem;
}
void activate(Configuration conf) {
heartbeatThread.start();
}
void close() {
heartbeatThread.interrupt();
}
synchronized int getLiveDatanodeCount() {
return datanodes.size();
}
@Override
public synchronized long getCapacityTotal() {
return stats.capacityTotal;
}
@Override
public synchronized long getCapacityUsed() {
return stats.capacityUsed;
}
@Override
public synchronized float getCapacityUsedPercent() {
return DFSUtil.getPercentUsed(stats.capacityUsed, stats.capacityTotal);
}
@Override
public synchronized long getCapacityRemaining() {
return stats.capacityRemaining;
}
@Override
public synchronized float getCapacityRemainingPercent() {
return DFSUtil.getPercentRemaining(
stats.capacityRemaining, stats.capacityTotal);
}
@Override
public synchronized long getBlockPoolUsed() {
return stats.blockPoolUsed;
}
@Override
public synchronized float getPercentBlockPoolUsed() {
return DFSUtil.getPercentUsed(stats.blockPoolUsed, stats.capacityTotal);
}
@Override
public synchronized long getCapacityUsedNonDFS() {
final long nonDFSUsed = stats.capacityTotal
- stats.capacityRemaining - stats.capacityUsed;
return nonDFSUsed < 0L? 0L : nonDFSUsed;
}
@Override
public synchronized int getXceiverCount() {
return stats.xceiverCount;
}
@Override
public synchronized long[] getStats() {
return new long[] {getCapacityTotal(),
getCapacityUsed(),
getCapacityRemaining(),
-1L,
-1L,
-1L,
getBlockPoolUsed()};
}
synchronized void register(final DatanodeDescriptor d) {
if (!datanodes.contains(d)) {
addDatanode(d);
//update its timestamp
d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
}
}
synchronized DatanodeDescriptor[] getDatanodes() {
return datanodes.toArray(new DatanodeDescriptor[datanodes.size()]);
}
synchronized void addDatanode(final DatanodeDescriptor d) {
datanodes.add(d);
d.isAlive = true;
}
synchronized void removeDatanode(DatanodeDescriptor node) {
if (node.isAlive) {
stats.subtract(node);
datanodes.remove(node);
node.isAlive = false;
}
}
synchronized void updateHeartbeat(final DatanodeDescriptor node,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int failedVolumes) {
stats.subtract(node);
node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, failedVolumes);
stats.add(node);
}
synchronized void startDecommission(final DatanodeDescriptor node) {
stats.subtract(node);
node.startDecommission();
stats.add(node);
}
synchronized void stopDecommission(final DatanodeDescriptor node) {
stats.subtract(node);
node.stopDecommission();
stats.add(node);
}
/**
* Check if there are any expired heartbeats, and if so,
* whether any blocks have to be re-replicated.
* While removing dead datanodes, make sure that only one datanode is marked
* dead at a time within the synchronized section. Otherwise, a cascading
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (namesystem.isInSafeMode()) {
return;
}
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
if (dm.isDatanodeDead(d)) {
namesystem.incrExpiredHeartbeats();
dead = d;
break;
}
}
}
allAlive = dead == null;
if (!allAlive) {
// acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock();
if (namesystem.isInSafeMode()) {
return;
}
try {
synchronized(this) {
dm.removeDeadDatanode(dead);
}
} finally {
namesystem.writeUnlock();
}
}
}
}
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
@Override
public void run() {
while(namesystem.isRunning()) {
try {
final long now = Util.now();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (namesystem.getBlockManager().shouldUpdateBlockKey(
now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
}
/** Datanode statistics.
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
private static class Stats {
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
private void add(final DatanodeDescriptor node) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
capacityTotal += node.getDfsUsed();
}
}
private void subtract(final DatanodeDescriptor node) {
capacityUsed -= node.getDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
capacityTotal -= node.getDfsUsed();
}
}
}
}

View File

@ -86,7 +86,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -97,6 +96,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@ -207,9 +207,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
private PermissionStatus defaultPermission;
// FSNamesystemMetrics counter variables
@Metric private MutableCounterInt expiredHeartbeats;
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int totalLoad = 0;
// Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@ -221,24 +218,17 @@ private static final void logAuditEvent(UserGroupInformation ugi,
//
public FSDirectory dir;
private BlockManager blockManager;
private DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode
String blockPoolId;
/**
* Stores a subset of datanodeMap, containing nodes that are considered alive.
* The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
*/
public ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
public LeaseManager leaseManager = new LeaseManager(this);
//
// Threaded object that checks to see if we have been
// getting heartbeats from all clients.
//
Daemon hbthread = null; // HeartbeatMonitor thread
public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
@ -248,9 +238,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
private volatile boolean fsRunning = true;
long systemStart = 0;
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
private long heartbeatRecheckInterval;
//resourceRecheckInterval is how often namenode checks for the disk space availability
private long resourceRecheckInterval;
@ -303,6 +290,7 @@ private void initialize(Configuration conf, FSImage fsImage)
checkAvailableResources();
this.systemStart = now();
this.blockManager = new BlockManager(this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
@ -333,10 +321,7 @@ void activateSecretManager() throws IOException {
void activate(Configuration conf) throws IOException {
setBlockTotal();
blockManager.activate(conf);
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor());
hbthread.start();
lmthread.start();
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
@ -463,10 +448,6 @@ private void setConfigurationParameters(Configuration conf)
DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.serverDefaults = new FsServerDefaults(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
@ -512,7 +493,6 @@ public void close() {
fsRunning = false;
try {
if (blockManager != null) blockManager.close();
if (hbthread != null) hbthread.interrupt();
if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads();
if (nnrmthread != null) nnrmthread.interrupt();
@ -622,7 +602,7 @@ BlocksWithLocations getBlocks(DatanodeID datanode, long size)
* Set permissions for an existing file.
* @throws IOException
*/
public void setPermission(String src, FsPermission permission)
void setPermission(String src, FsPermission permission)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
HdfsFileStatus resultingStat = null;
@ -651,7 +631,7 @@ public void setPermission(String src, FsPermission permission)
* Set owner for an existing file.
* @throws IOException
*/
public void setOwner(String src, String username, String group)
void setOwner(String src, String username, String group)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
HdfsFileStatus resultingStat = null;
@ -818,12 +798,6 @@ LocatedBlocks getBlockLocationsInternal(INodeFile inode,
lastBlock, last.isComplete());
}
}
/** Create a LocatedBlock. */
public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
final long offset, final boolean corrupt) throws IOException {
return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
}
/**
@ -1018,7 +992,7 @@ public void setTimes(String src, long mtime, long atime)
/**
* Create a symbolic link.
*/
public void createSymlink(String target, String link,
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
HdfsFileStatus resultingStat = null;
@ -1988,7 +1962,7 @@ private void renameToInternal(String src, String dst,
* @see ClientProtocol#delete(String, boolean) for detailed descriptoin and
* description of exceptions
*/
public boolean delete(String src, boolean recursive)
boolean delete(String src, boolean recursive)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -2118,7 +2092,7 @@ HdfsFileStatus getFileInfo(String src, boolean resolveLink)
/**
* Create all the necessary directories
*/
public boolean mkdirs(String src, PermissionStatus permissions,
boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
boolean status = false;
if(NameNode.stateChangeLog.isDebugEnabled()) {
@ -2536,7 +2510,7 @@ void renewLease(String holder) throws IOException {
* @throws UnresolvedLinkException if symbolic link is encountered
* @throws IOException if other I/O error occurred
*/
public DirectoryListing getListing(String src, byte[] startAfter,
DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation)
throws AccessControlException, UnresolvedLinkException, IOException {
DirectoryListing dl;
@ -2606,7 +2580,7 @@ public void registerDatanode(DatanodeRegistration nodeReg)
* @see #registerDatanode(DatanodeRegistration)
* @return registration ID
*/
public String getRegistrationID() {
String getRegistrationID() {
return Storage.getRegistrationID(dir.fsImage.getStorage());
}
@ -2627,7 +2601,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
throws IOException {
readLock();
try {
final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
@ -2655,35 +2630,6 @@ public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
}
}
public void updateStats(DatanodeDescriptor node, boolean isAdded) {
//
// The statistics are protected by the heartbeat lock
// For decommissioning/decommissioned nodes, only used capacity
// is counted.
//
assert(Thread.holdsLock(heartbeats));
if (isAdded) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
totalLoad += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
capacityTotal += node.getDfsUsed();
}
} else {
capacityUsed -= node.getDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
totalLoad -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
capacityTotal -= node.getDfsUsed();
}
}
}
/**
* Returns whether or not there were available resources at the last check of
@ -2735,86 +2681,7 @@ public void run () {
}
}
}
/**
* Periodically calls heartbeatCheck() and updateBlockKey()
*/
class HeartbeatMonitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
/**
*/
public void run() {
while (fsRunning) {
try {
long now = now();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.isBlockTokenEnabled()
&& (lastBlockKeyUpdate + blockManager.getBlockKeyUpdateInterval() < now)) {
blockManager.updateBlockKey();
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
FSNamesystem.LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
}
public void setNodeReplicationLimit(int limit) {
blockManager.maxReplicationStreams = limit;
}
/**
* Remove a datanode descriptor.
* @param nodeID datanode ID.
* @throws UnregisteredNodeException
*/
public void removeDatanode(final DatanodeID nodeID
) throws UnregisteredNodeException {
writeLock();
try {
DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
).getDatanode(nodeID);
if (nodeInfo != null) {
removeDatanode(nodeInfo);
} else {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+ nodeID.getName() + " does not exist");
}
} finally {
writeUnlock();
}
}
/**
* Remove a datanode descriptor.
* @param nodeInfo datanode descriptor.
*/
public void removeDatanode(DatanodeDescriptor nodeInfo) {
assert hasWriteLock();
synchronized (heartbeats) {
if (nodeInfo.isAlive) {
updateStats(nodeInfo, false);
heartbeats.remove(nodeInfo);
nodeInfo.isAlive = false;
}
}
blockManager.removeDatanode(nodeInfo);
checkSafeMode();
}
FSImage getFSImage() {
return dir.fsImage;
}
@ -2822,61 +2689,12 @@ FSImage getFSImage() {
FSEditLog getEditLog() {
return getFSImage().getEditLog();
}
/**
* Check if there are any expired heartbeats, and if so,
* whether any blocks have to be re-replicated.
* While removing dead datanodes, make sure that only one datanode is marked
* dead at a time within the synchronized section. Otherwise, a cascading
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (isInSafeMode()) {
return;
}
boolean allAlive = false;
while (!allAlive) {
boolean foundDead = false;
DatanodeID nodeID = null;
// locate the first dead node.
synchronized(heartbeats) {
for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
it.hasNext();) {
DatanodeDescriptor nodeInfo = it.next();
if (datanodeManager.isDatanodeDead(nodeInfo)) {
expiredHeartbeats.incr();
foundDead = true;
nodeID = nodeInfo;
break;
}
}
}
// acquire the fsnamesystem lock, and then remove the dead node.
if (foundDead) {
writeLock();
if (isInSafeMode()) {
return;
}
try {
datanodeManager.removeDeadDatanode(nodeID);
} finally {
writeUnlock();
}
}
allAlive = !foundDead;
}
}
/**
* The given node is reporting all its blocks. Use this info to
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
public void processReport(DatanodeID nodeID, String poolId,
void processReport(DatanodeID nodeID, String poolId,
BlockListAsLongs newReport) throws IOException {
long startTime, endTime;
@ -3057,15 +2875,18 @@ public long getMissingBlocksCount() {
return blockManager.getMissingBlocksCount();
}
/** Increment expired heartbeat counter. */
public void incrExpiredHeartbeats() {
expiredHeartbeats.incr();
}
/** @see ClientProtocol#getStats() */
long[] getStats() {
synchronized(heartbeats) {
return new long[] {this.capacityTotal, this.capacityUsed,
this.capacityRemaining,
getUnderReplicatedBlocks(),
getCorruptReplicaBlocks(),
getMissingBlocksCount(),
getBlockPoolUsedSpace()};
}
final long[] stats = datanodeStatistics.getStats();
stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
return stats;
}
/**
@ -3073,9 +2894,7 @@ long[] getStats() {
*/
@Override // FSNamesystemMBean
public long getCapacityTotal() {
synchronized(heartbeats) {
return capacityTotal;
}
return datanodeStatistics.getCapacityTotal();
}
@Metric
@ -3088,9 +2907,7 @@ public float getCapacityTotalGB() {
*/
@Override // FSNamesystemMBean
public long getCapacityUsed() {
synchronized(heartbeats) {
return capacityUsed;
}
return datanodeStatistics.getCapacityUsed();
}
@Metric
@ -3098,32 +2915,9 @@ public float getCapacityUsedGB() {
return DFSUtil.roundBytesToGB(getCapacityUsed());
}
/**
* Total used space by data nodes as percentage of total capacity
*/
public float getCapacityUsedPercent() {
synchronized(heartbeats){
return DFSUtil.getPercentUsed(capacityUsed, capacityTotal);
}
}
/**
* Total used space by data nodes for non DFS purposes such
* as storing temporary files on the local file system
*/
public long getCapacityUsedNonDFS() {
long nonDFSUsed = 0;
synchronized(heartbeats){
nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
}
return nonDFSUsed < 0 ? 0 : nonDFSUsed;
}
/**
* Total non-used raw bytes.
*/
@Override
public long getCapacityRemaining() {
synchronized(heartbeats) {
return capacityRemaining;
}
return datanodeStatistics.getCapacityRemaining();
}
@Metric
@ -3131,23 +2925,13 @@ public float getCapacityRemainingGB() {
return DFSUtil.roundBytesToGB(getCapacityRemaining());
}
/**
* Total remaining space by data nodes as percentage of total capacity
*/
public float getCapacityRemainingPercent() {
synchronized(heartbeats){
return DFSUtil.getPercentRemaining(capacityRemaining, capacityTotal);
}
}
/**
* Total number of connections.
*/
@Override // FSNamesystemMBean
@Metric
public int getTotalLoad() {
synchronized (heartbeats) {
return this.totalLoad;
}
return datanodeStatistics.getXceiverCount();
}
int getNumberOfDatanodes(DatanodeReportType type) {
@ -3757,8 +3541,9 @@ boolean setSafeMode(SafeModeAction action) throws IOException {
}
return isInSafeMode();
}
private void checkSafeMode() {
/** Check and trigger safe mode. */
public void checkSafeMode() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode != null) {
@ -4131,11 +3916,6 @@ public long getUnderReplicatedBlocks() {
return blockManager.getUnderReplicatedBlocksCount();
}
/** Return number of under-replicated but not missing blocks */
public long getUnderReplicatedNotMissingBlocks() {
return blockManager.getUnderReplicatedNotMissingBlocks();
}
/** Returns number of blocks with corrupt replicas */
@Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
public long getCorruptReplicaBlocks() {
@ -4207,14 +3987,14 @@ public int getNumDeadDataNodes() {
/**
* Sets the generation stamp for this filesystem
*/
public void setGenerationStamp(long stamp) {
void setGenerationStamp(long stamp) {
generationStamp.setStamp(stamp);
}
/**
* Gets the generation stamp for this filesystem
*/
public long getGenerationStamp() {
long getGenerationStamp() {
return generationStamp.getStamp();
}
@ -4854,31 +4634,27 @@ public boolean isUpgradeFinalized() {
@Override // NameNodeMXBean
public long getNonDfsUsedSpace() {
return getCapacityUsedNonDFS();
return datanodeStatistics.getCapacityUsedNonDFS();
}
@Override // NameNodeMXBean
public float getPercentUsed() {
return getCapacityUsedPercent();
return datanodeStatistics.getCapacityUsedPercent();
}
@Override // NameNodeMXBean
public long getBlockPoolUsedSpace() {
synchronized(heartbeats) {
return blockPoolUsed;
}
return datanodeStatistics.getBlockPoolUsed();
}
@Override // NameNodeMXBean
public float getPercentBlockPoolUsed() {
synchronized(heartbeats) {
return DFSUtil.getPercentUsed(blockPoolUsed, capacityTotal);
}
return datanodeStatistics.getPercentBlockPoolUsed();
}
@Override // NameNodeMXBean
public float getPercentRemaining() {
return getCapacityRemainingPercent();
return datanodeStatistics.getCapacityRemainingPercent();
}
@Override // NameNodeMXBean

View File

@ -1229,7 +1229,7 @@ public void errorReport(DatanodeRegistration nodeReg,
LOG.warn("Disk error on " + dnName + ": " + msg);
} else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
LOG.warn("Fatal disk error on " + dnName + ": " + msg);
namesystem.removeDatanode(nodeReg);
namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);
} else {
LOG.info("Error report from " + dnName + ": " + msg);
}

View File

@ -349,7 +349,7 @@ void generateHealthReport(JspWriter out, NameNode nn,
+ colTxt() + ":" + colTxt() + decommissioning.size()
+ rowTxt() + colTxt("Excludes missing blocks.")
+ "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
+ fsn.getUnderReplicatedNotMissingBlocks()
+ fsn.getBlockManager().getUnderReplicatedNotMissingBlocks()
+ "</table></div><br>\n");
if (live.isEmpty() && dead.isEmpty()) {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
/**
* The test makes sure that NameNode detects presense blocks that do not have
@ -56,6 +57,7 @@ public void testMissingBlocksAlert() throws IOException,
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
final BlockManager bm = cluster.getNamesystem().getBlockManager();
DistributedFileSystem dfs =
(DistributedFileSystem) cluster.getFileSystem();
@ -86,8 +88,7 @@ public void testMissingBlocksAlert() throws IOException,
}
assertTrue(dfs.getMissingBlocksCount() == 1);
assertEquals(4, dfs.getUnderReplicatedBlocksCount());
assertEquals(3,
cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());
// Now verify that it shows up on webui
@ -109,8 +110,7 @@ public void testMissingBlocksAlert() throws IOException,
}
assertEquals(2, dfs.getUnderReplicatedBlocksCount());
assertEquals(2,
cluster.getNamesystem().getUnderReplicatedNotMissingBlocks());
assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());
// and make sure WARNING disappears
// Now verify that it shows up on webui

View File

@ -25,10 +25,13 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon;
public class BlockManagerTestUtil {
public static void setNodeReplicationLimit(final BlockManager blockManager,
final int limit) {
blockManager.maxReplicationStreams = limit;
}
/** @return the datanode descriptor for the given the given storageID. */
public static DatanodeDescriptor getDatanode(final FSNamesystem ns,

View File

@ -16,10 +16,16 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
@ -31,18 +37,14 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.log4j.Level;
import static org.junit.Assert.*;
import org.junit.Test;
public class TestBlocksWithNotEnoughRacks {
public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
static {
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
}
@ -278,6 +280,7 @@ public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block with a replication factor of 2
@ -293,7 +296,7 @@ public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
DataNode dataNode = datanodes.get(idx);
DatanodeID dnId = dataNode.getDatanodeId();
cluster.stopDataNode(idx);
ns.removeDatanode(dnId);
dm.removeDatanode(dnId);
// The block should still have sufficient # replicas, across racks.
// The last node may not have contained a replica, but if it did
@ -307,7 +310,7 @@ public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
dataNode = datanodes.get(idx);
dnId = dataNode.getDatanodeId();
cluster.stopDataNode(idx);
ns.removeDatanode(dnId);
dm.removeDatanode(dnId);
// Make sure we have enough live replicas even though we are
// short one rack and therefore need one replica
@ -332,6 +335,7 @@ public void testReduceReplFactorDueToRejoinRespectsRackPolicy()
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block
@ -347,7 +351,7 @@ public void testReduceReplFactorDueToRejoinRespectsRackPolicy()
DataNode dataNode = datanodes.get(2);
DatanodeID dnId = dataNode.getDatanodeId();
cluster.stopDataNode(2);
ns.removeDatanode(dnId);
dm.removeDatanode(dnId);
// The block gets re-replicated to another datanode so it has a
// sufficient # replicas, but not across racks, so there should

View File

@ -45,8 +45,8 @@ public void testCompInvalidate() throws Exception {
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
DatanodeDescriptor[] nodes =
namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
final DatanodeDescriptor[] nodes = bm.getDatanodeManager(
).getHeartbeatManager().getDatanodes();
assertEquals(nodes.length, NUM_OF_DATANODES);
namesystem.writeLock();

View File

@ -52,6 +52,8 @@ public void testHeartbeat() throws Exception {
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
@ -69,7 +71,7 @@ public void testHeartbeat() throws Exception {
try {
namesystem.writeLock();
synchronized (namesystem.heartbeats) {
synchronized(hm) {
for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
dd.addBlockToBeReplicated(
new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collection;
import java.util.Iterator;
@ -33,7 +33,9 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
/**
* Test if live nodes count per node is correct
@ -57,6 +59,8 @@ public void testNodeCount() throws Exception {
new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).build();
try {
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final HeartbeatManager hm = bm.getDatanodeManager().getHeartbeatManager();
final FileSystem fs = cluster.getFileSystem();
// populate the cluster with a one block file
@ -66,8 +70,7 @@ public void testNodeCount() throws Exception {
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
// keep a copy of all datanode descriptor
DatanodeDescriptor[] datanodes =
namesystem.heartbeats.toArray(new DatanodeDescriptor[REPLICATION_FACTOR]);
final DatanodeDescriptor[] datanodes = hm.getDatanodes();
// start two new nodes
cluster.startDataNodes(conf, 2, true, null, null);
@ -80,9 +83,9 @@ public void testNodeCount() throws Exception {
// make sure that NN detects that the datanode is down
try {
namesystem.writeLock();
synchronized (namesystem.heartbeats) {
synchronized (hm) {
datanode.setLastUpdate(0); // mark it dead
namesystem.heartbeatCheck();
hm.heartbeatCheck();
}
} finally {
namesystem.writeUnlock();
@ -102,12 +105,12 @@ public void testNodeCount() throws Exception {
}
// find out a non-excess node
Iterator<DatanodeDescriptor> iter = namesystem.getBlockManager().blocksMap
final Iterator<DatanodeDescriptor> iter = bm.blocksMap
.nodeIterator(block.getLocalBlock());
DatanodeDescriptor nonExcessDN = null;
while (iter.hasNext()) {
DatanodeDescriptor dn = iter.next();
Collection<Block> blocks = namesystem.getBlockManager().excessReplicateMap.get(dn.getStorageID());
Collection<Block> blocks = bm.excessReplicateMap.get(dn.getStorageID());
if (blocks == null || !blocks.contains(block) ) {
nonExcessDN = dn;
break;
@ -121,9 +124,9 @@ public void testNodeCount() throws Exception {
try {
namesystem.writeLock();
synchronized (namesystem.heartbeats) {
synchronized(hm) {
nonExcessDN.setLastUpdate(0); // mark it dead
namesystem.heartbeatCheck();
hm.heartbeatCheck();
}
} finally {
namesystem.writeUnlock();

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.File;
import java.io.IOException;
@ -34,7 +34,9 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
public class TestOverReplicatedBlocks extends TestCase {
/** Test processOverReplicatedBlock can handle corrupt replicas fine.
@ -83,13 +85,15 @@ public void testProcesOverReplicateBlock() throws IOException {
cluster.getDataNodes().get(2), blockPoolId);
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final HeartbeatManager hm = bm.getDatanodeManager().getHeartbeatManager();
try {
namesystem.writeLock();
synchronized (namesystem.heartbeats) {
synchronized(hm) {
// set live datanode's remaining space to be 0
// so they will be chosen to be deleted when over-replication occurs
String corruptMachineName = corruptDataNode.getName();
for (DatanodeDescriptor datanode : namesystem.heartbeats) {
for (DatanodeDescriptor datanode : hm.getDatanodes()) {
if (!corruptMachineName.equals(datanode.getName())) {
datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0);
}
@ -100,8 +104,7 @@ public void testProcesOverReplicateBlock() throws IOException {
// corrupt one won't be chosen to be excess one
// without 4910 the number of live replicas would be 0: block gets lost
assertEquals(1, namesystem.getBlockManager().countNodes(block.getLocalBlock())
.liveReplicas());
assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas());
}
} finally {
namesystem.writeUnlock();

View File

@ -1128,7 +1128,8 @@ void generateInputs(int[] ignore) throws IOException {
// decommission data-nodes
decommissionNodes();
// set node replication limit
namesystem.setNodeReplicationLimit(nodeReplicationLimit);
BlockManagerTestUtil.setNodeReplicationLimit(namesystem.getBlockManager(),
nodeReplicationLimit);
}
private void decommissionNodes() throws IOException {
@ -1171,9 +1172,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
void printResults() {
String blockDistribution = "";
String delim = "(";
int totalReplicas = 0;
for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
delim = ", ";
}

View File

@ -112,10 +112,10 @@ public void testVolumeSize() throws Exception {
configCapacity = namesystem.getCapacityTotal();
used = namesystem.getCapacityUsed();
nonDFSUsed = namesystem.getCapacityUsedNonDFS();
nonDFSUsed = namesystem.getNonDfsUsedSpace();
remaining = namesystem.getCapacityRemaining();
percentUsed = namesystem.getCapacityUsedPercent();
percentRemaining = namesystem.getCapacityRemainingPercent();
percentUsed = namesystem.getPercentUsed();
percentRemaining = namesystem.getPercentRemaining();
bpUsed = namesystem.getBlockPoolUsedSpace();
percentBpUsed = namesystem.getPercentBlockPoolUsed();