HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-10-14 16:17:49 -07:00
parent a8070259f8
commit be7a0add8b
14 changed files with 349 additions and 323 deletions

View File

@ -1527,6 +1527,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9238. Update TestFileCreation.testLeaseExpireHardLimit() to avoid using
DataNodeTestUtils.getFile(). (Tony Wu via lei)
HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. (jing9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -1022,9 +1022,9 @@ public void setBlockToken(final LocatedBlock b,
void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
final DatanodeDescriptor nodeinfo) {
// check access key update
if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate()) {
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
nodeinfo.needKeyUpdate = false;
nodeinfo.setNeedKeyUpdate(false);
}
}
@ -1966,7 +1966,7 @@ public boolean processReport(final DatanodeID nodeID,
try {
node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
if (node == null || !node.isAlive()) {
throw new IOException(
"ProcessReport from dead or unregistered node: " + nodeID);
}
@ -3528,7 +3528,7 @@ public void processIncrementalBlockReport(final DatanodeID nodeID,
int deleted = 0;
int receiving = 0;
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
if (node == null || !node.isAlive()) {
blockLog.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node {}", nodeID);
throw new IOException(
@ -3678,7 +3678,7 @@ boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
return false;
}
if (node.isAlive) {
if (node.isAlive()) {
return true;
}

View File

@ -131,7 +131,7 @@ void removeBlock(Block block) {
for(int idx = size - 1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
if (dn != null) {
dn.removeBlock(blockInfo); // remove from the list and wipe the location
removeBlock(dn, blockInfo); // remove from the list and wipe the location
}
}
}
@ -195,7 +195,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
return false;
// remove block from the data-node list and the node from the block info
boolean removed = node.removeBlock(info);
boolean removed = removeBlock(node, info);
if (info.hasNoStorage() // no datanodes left
&& info.isDeleted()) { // does not belong to a file
@ -204,6 +204,16 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
return removed;
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {
final DatanodeStorageInfo s = b.findStorageInfo(dn);
// if block exists on this datanode
return s != null && s.removeBlock(b);
}
int size() {
if (blocks != null) {
return blocks.size();

View File

@ -67,29 +67,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
public static final Logger LOG =
LoggerFactory.getLogger(DatanodeDescriptor.class);
public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private static final List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
ImmutableList.of();
/** Block and targets pair */
@InterfaceAudience.Private
@ -106,7 +86,7 @@ public static class BlockTargetPair {
/** A BlockTargetPair queue. */
private static class BlockQueue<E> {
private final Queue<E> blockq = new LinkedList<E>();
private final Queue<E> blockq = new LinkedList<>();
/** Size of the queue */
synchronized int size() {return blockq.size();}
@ -132,7 +112,7 @@ synchronized List<E> poll(int numBlocks) {
/**
* Returns <tt>true</tt> if the queue contains the specified element.
*/
boolean contains(E e) {
synchronized boolean contains(E e) {
return blockq.contains(e);
}
@ -141,9 +121,6 @@ synchronized void clear() {
}
}
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
/**
* A list of CachedBlock objects on this datanode.
*/
@ -172,6 +149,18 @@ public Type getType() {
}
}
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
/**
* The blocks which we want to cache on this DataNode.
*/
@ -191,18 +180,6 @@ public Type getType() {
private final CachedBlocksList pendingUncached =
new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
public CachedBlocksList getPendingCached() {
return pendingCached;
}
public CachedBlocksList getCached() {
return cached;
}
public CachedBlocksList getPendingUncached() {
return pendingUncached;
}
/**
* The time when the last batch of caching directives was sent, in
* monotonic milliseconds.
@ -211,9 +188,8 @@ public CachedBlocksList getPendingUncached() {
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
public boolean needKeyUpdate = false;
private boolean isAlive = false;
private boolean needKeyUpdate = false;
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
@ -245,7 +221,6 @@ public CachedBlocksList getPendingUncached() {
private EnumCounters<StorageType> prevApproxBlocksScheduled
= new EnumCounters<>(StorageType.class);
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0;
private VolumeFailureSummary volumeFailureSummary = null;
@ -281,6 +256,48 @@ public DatanodeDescriptor(DatanodeID nodeID,
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
public CachedBlocksList getPendingCached() {
return pendingCached;
}
public CachedBlocksList getCached() {
return cached;
}
public CachedBlocksList getPendingUncached() {
return pendingUncached;
}
public boolean isAlive() {
return isAlive;
}
public void setAlive(boolean isAlive) {
this.isAlive = isAlive;
}
public boolean needKeyUpdate() {
return needKeyUpdate;
}
public void setNeedKeyUpdate(boolean needKeyUpdate) {
this.needKeyUpdate = needKeyUpdate;
}
@VisibleForTesting
public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {
@ -316,9 +333,6 @@ boolean hasStaleStorages() {
}
}
static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
ImmutableList.of();
List<DatanodeStorageInfo> removeZombieStorages() {
List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) {
@ -344,28 +358,6 @@ List<DatanodeStorageInfo> removeZombieStorages() {
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
boolean removeBlock(BlockInfo b) {
final DatanodeStorageInfo s = b.findStorageInfo(this);
// if block exists on this datanode
if (s != null) {
return s.removeBlock(b);
}
return false;
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
boolean removeBlock(String storageID, BlockInfo b) {
DatanodeStorageInfo s = getStorageInfo(storageID);
return s != null && s.removeBlock(b);
}
public void resetBlocks() {
setCapacity(0);
setRemaining(0);
@ -384,10 +376,10 @@ public void resetBlocks() {
public void clearBlockQueues() {
synchronized (invalidateBlocks) {
this.invalidateBlocks.clear();
this.recoverBlocks.clear();
this.replicateBlocks.clear();
this.erasurecodeBlocks.clear();
}
this.recoverBlocks.clear();
this.replicateBlocks.clear();
this.erasurecodeBlocks.clear();
// pendingCached, cached, and pendingUncached are protected by the
// FSN lock.
this.pendingCached.clear();
@ -589,10 +581,6 @@ Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(getStorageInfos());
}
Iterator<BlockInfo> getBlockIterator(final String storageID) {
return new BlockIterator(getStorageInfo(storageID));
}
void incrementPendingReplicationWithoutTargets() {
pendingReplicationWithoutTargets++;
}
@ -662,16 +650,6 @@ public int getNumberOfBlocksToBeErasureCoded() {
return erasurecodeBlocks.size();
}
/**
* The number of block invalidation items that are pending to
* be sent to the datanode
*/
int getNumberOfBlocksToBeInvalidated() {
synchronized (invalidateBlocks) {
return invalidateBlocks.size();
}
}
public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}
@ -714,7 +692,6 @@ public boolean containsInvalidateBlock(Block block) {
*
* @param t requested storage type
* @param blockSize requested block size
* @return
*/
public DatanodeStorageInfo chooseStorage4Block(StorageType t,
long blockSize) {
@ -724,8 +701,7 @@ public DatanodeStorageInfo chooseStorage4Block(StorageType t,
long remaining = 0;
DatanodeStorageInfo storage = null;
for (DatanodeStorageInfo s : getStorageInfos()) {
if (s.getState() == State.NORMAL &&
s.getStorageType() == t) {
if (s.getState() == State.NORMAL && s.getStorageType() == t) {
if (storage == null) {
storage = s;
}
@ -761,7 +737,7 @@ public int getBlocksScheduled() {
/** Increment the number of blocks scheduled. */
void incrementBlocksScheduled(StorageType t) {
currApproxBlocksScheduled.add(t, 1);;
currApproxBlocksScheduled.add(t, 1);
}
/** Decrement the number of blocks scheduled. */
@ -805,7 +781,7 @@ public class DecommissioningStatus {
synchronized void set(int underRep,
int onlyRep, int underConstruction) {
if (isDecommissionInProgress() == false) {
if (!isDecommissionInProgress()) {
return;
}
underReplicatedBlocks = underRep;
@ -815,21 +791,21 @@ synchronized void set(int underRep,
/** @return the number of under-replicated blocks */
public synchronized int getUnderReplicatedBlocks() {
if (isDecommissionInProgress() == false) {
if (!isDecommissionInProgress()) {
return 0;
}
return underReplicatedBlocks;
}
/** @return the number of decommission-only replicas */
public synchronized int getDecommissionOnlyReplicas() {
if (isDecommissionInProgress() == false) {
if (!isDecommissionInProgress()) {
return 0;
}
return decommissionOnlyReplicas;
}
/** @return the number of under-replicated blocks in open files */
public synchronized int getUnderReplicatedInOpenFiles() {
if (isDecommissionInProgress() == false) {
if (!isDecommissionInProgress()) {
return 0;
}
return underReplicatedInOpenFiles;
@ -840,7 +816,7 @@ public synchronized void setStartTime(long time) {
}
/** @return start time */
public synchronized long getStartTime() {
if (isDecommissionInProgress() == false) {
if (!isDecommissionInProgress()) {
return 0;
}
return startTime;
@ -962,8 +938,7 @@ public void setLastCachingDirectiveSentTimeMs(long time) {
}
/**
* checks whether atleast first block report has been received
* @return
* @return whether at least first block report has been received
*/
public boolean checkBlockReportReceived() {
if(this.getStorageInfos().length == 0) {

View File

@ -309,7 +309,7 @@ private static long getStaleIntervalFromConf(Configuration conf,
void activate(final Configuration conf) {
decomManager.activate(conf);
heartbeatManager.activate(conf);
heartbeatManager.activate();
}
void close() {
@ -659,7 +659,7 @@ private void decrementVersionCount(String version) {
}
private boolean shouldCountVersion(DatanodeDescriptor node) {
return node.getSoftwareVersion() != null && node.isAlive &&
return node.getSoftwareVersion() != null && node.isAlive() &&
!isDatanodeDead(node);
}
@ -1343,7 +1343,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive) {
if (nodeinfo == null || !nodeinfo.isAlive()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}

View File

@ -0,0 +1,193 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Datanode statistics.
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
class DatanodeStats {
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
private int nodesInService = 0;
private int nodesInServiceXceiverCount = 0;
private int expiredHeartbeats = 0;
synchronized void add(final DatanodeDescriptor node) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService++;
nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
capacityTotal += node.getDfsUsed();
}
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
Set<StorageType> storageTypes = new HashSet<>();
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
statsMap.addStorage(storageInfo, node);
storageTypes.add(storageInfo.getStorageType());
}
for (StorageType storageType : storageTypes) {
statsMap.addNode(storageType, node);
}
}
synchronized void subtract(final DatanodeDescriptor node) {
capacityUsed -= node.getDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService--;
nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
capacityTotal -= node.getDfsUsed();
}
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
Set<StorageType> storageTypes = new HashSet<>();
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
statsMap.subtractStorage(storageInfo, node);
storageTypes.add(storageInfo.getStorageType());
}
for (StorageType storageType : storageTypes) {
statsMap.subtractNode(storageType, node);
}
}
/** Increment expired heartbeat counter. */
void incrExpiredHeartbeats() {
expiredHeartbeats++;
}
synchronized Map<StorageType, StorageTypeStats> getStatsMap() {
return statsMap.get();
}
synchronized long getCapacityTotal() {
return capacityTotal;
}
synchronized long getCapacityUsed() {
return capacityUsed;
}
synchronized long getCapacityRemaining() {
return capacityRemaining;
}
synchronized long getBlockPoolUsed() {
return blockPoolUsed;
}
synchronized int getXceiverCount() {
return xceiverCount;
}
synchronized long getCacheCapacity() {
return cacheCapacity;
}
synchronized long getCacheUsed() {
return cacheUsed;
}
synchronized int getNodesInService() {
return nodesInService;
}
synchronized int getNodesInServiceXceiverCount() {
return nodesInServiceXceiverCount;
}
synchronized int getExpiredHeartbeats() {
return expiredHeartbeats;
}
synchronized float getCapacityRemainingPercent() {
return DFSUtilClient.getPercentRemaining(capacityRemaining, capacityTotal);
}
synchronized float getPercentBlockPoolUsed() {
return DFSUtilClient.getPercentUsed(blockPoolUsed, capacityTotal);
}
synchronized long getCapacityUsedNonDFS() {
final long nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
return nonDFSUsed < 0L? 0L : nonDFSUsed;
}
synchronized float getCapacityUsedPercent() {
return DFSUtilClient.getPercentUsed(capacityUsed, capacityTotal);
}
static final class StorageTypeStatsMap {
private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
new EnumMap<>(StorageType.class);
private Map<StorageType, StorageTypeStats> get() {
return new EnumMap<>(storageTypeStatsMap);
}
private void addNode(StorageType storageType,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(storageType);
if (storageTypeStats == null) {
storageTypeStats = new StorageTypeStats();
storageTypeStatsMap.put(storageType, storageTypeStats);
}
storageTypeStats.addNode(node);
}
private void addStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(info.getStorageType());
if (storageTypeStats == null) {
storageTypeStats = new StorageTypeStats();
storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
}
storageTypeStats.addStorage(info, node);
}
private void subtractStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(info.getStorageType());
if (storageTypeStats != null) {
storageTypeStats.subtractStorage(info, node);
}
}
private void subtractNode(StorageType storageType,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(storageType);
if (storageTypeStats != null) {
storageTypeStats.subtractNode(node);
}
}
}
}

View File

@ -216,7 +216,7 @@ public void stopDecommission(DatanodeDescriptor node) {
hbManager.stopDecommission(node);
// Over-replicated blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive) {
if (node.isAlive()) {
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
// Remove from tracking in DecommissionManager

View File

@ -18,18 +18,13 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -57,10 +52,10 @@ class HeartbeatManager implements DatanodeStatistics {
* and removes them from the list.
* It is synchronized by the heartbeat manager lock.
*/
private final List<DatanodeDescriptor> datanodes = new ArrayList<DatanodeDescriptor>();
private final List<DatanodeDescriptor> datanodes = new ArrayList<>();
/** Statistics, which are synchronized by the heartbeat manager lock. */
private final Stats stats = new Stats();
private final DatanodeStats stats = new DatanodeStats();
/** The time period to check for expired datanodes */
private final long heartbeatRecheckInterval;
@ -96,7 +91,7 @@ class HeartbeatManager implements DatanodeStatistics {
}
}
void activate(Configuration conf) {
void activate() {
heartbeatThread.start();
}
@ -105,7 +100,7 @@ void close() {
try {
// This will no effect if the thread hasn't yet been started.
heartbeatThread.join(3000);
} catch (InterruptedException e) {
} catch (InterruptedException ignored) {
}
}
@ -114,74 +109,69 @@ synchronized int getLiveDatanodeCount() {
}
@Override
public synchronized long getCapacityTotal() {
return stats.capacityTotal;
public long getCapacityTotal() {
return stats.getCapacityTotal();
}
@Override
public synchronized long getCapacityUsed() {
return stats.capacityUsed;
public long getCapacityUsed() {
return stats.getCapacityUsed();
}
@Override
public synchronized float getCapacityUsedPercent() {
return DFSUtilClient.getPercentUsed(stats.capacityUsed, stats.capacityTotal);
public float getCapacityUsedPercent() {
return stats.getCapacityUsedPercent();
}
@Override
public synchronized long getCapacityRemaining() {
return stats.capacityRemaining;
public long getCapacityRemaining() {
return stats.getCapacityRemaining();
}
@Override
public synchronized float getCapacityRemainingPercent() {
return DFSUtilClient.getPercentRemaining(stats.capacityRemaining,
stats.capacityTotal);
public float getCapacityRemainingPercent() {
return stats.getCapacityRemainingPercent();
}
@Override
public synchronized long getBlockPoolUsed() {
return stats.blockPoolUsed;
public long getBlockPoolUsed() {
return stats.getBlockPoolUsed();
}
@Override
public synchronized float getPercentBlockPoolUsed() {
return DFSUtilClient.getPercentUsed(stats.blockPoolUsed,
stats.capacityTotal);
public float getPercentBlockPoolUsed() {
return stats.getPercentBlockPoolUsed();
}
@Override
public synchronized long getCapacityUsedNonDFS() {
final long nonDFSUsed = stats.capacityTotal
- stats.capacityRemaining - stats.capacityUsed;
return nonDFSUsed < 0L? 0L : nonDFSUsed;
public long getCapacityUsedNonDFS() {
return stats.getCapacityUsedNonDFS();
}
@Override
public synchronized int getXceiverCount() {
return stats.xceiverCount;
public int getXceiverCount() {
return stats.getXceiverCount();
}
@Override
public synchronized int getInServiceXceiverCount() {
return stats.nodesInServiceXceiverCount;
public int getInServiceXceiverCount() {
return stats.getNodesInServiceXceiverCount();
}
@Override
public synchronized int getNumDatanodesInService() {
return stats.nodesInService;
public int getNumDatanodesInService() {
return stats.getNodesInService();
}
@Override
public synchronized long getCacheCapacity() {
return stats.cacheCapacity;
public long getCacheCapacity() {
return stats.getCacheCapacity();
}
@Override
public synchronized long getCacheUsed() {
return stats.cacheUsed;
public long getCacheUsed() {
return stats.getCacheUsed();
}
@Override
public synchronized long[] getStats() {
@ -195,17 +185,17 @@ public synchronized long[] getStats() {
}
@Override
public synchronized int getExpiredHeartbeats() {
return stats.expiredHeartbeats;
public int getExpiredHeartbeats() {
return stats.getExpiredHeartbeats();
}
@Override
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return stats.statsMap.get();
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return stats.getStatsMap();
}
synchronized void register(final DatanodeDescriptor d) {
if (!d.isAlive) {
if (!d.isAlive()) {
addDatanode(d);
//update its timestamp
@ -221,14 +211,14 @@ synchronized void addDatanode(final DatanodeDescriptor d) {
// update in-service node count
stats.add(d);
datanodes.add(d);
d.isAlive = true;
d.setAlive(true);
}
synchronized void removeDatanode(DatanodeDescriptor node) {
if (node.isAlive) {
if (node.isAlive()) {
stats.subtract(node);
datanodes.remove(node);
node.isAlive = false;
node.setAlive(false);
}
}
@ -243,7 +233,7 @@ synchronized void updateHeartbeat(final DatanodeDescriptor node,
}
synchronized void startDecommission(final DatanodeDescriptor node) {
if (!node.isAlive) {
if (!node.isAlive()) {
LOG.info("Dead node {} is decommissioned immediately.", node);
node.setDecommissioned();
} else {
@ -255,8 +245,8 @@ synchronized void startDecommission(final DatanodeDescriptor node) {
synchronized void stopDecommission(final DatanodeDescriptor node) {
LOG.info("Stopping decommissioning of {} node {}",
node.isAlive ? "live" : "dead", node);
if (!node.isAlive) {
node.isAlive() ? "live" : "dead", node);
if (!node.isAlive()) {
node.stopDecommission();
} else {
stats.subtract(node);
@ -302,6 +292,7 @@ boolean shouldAbortHeartbeatCheck(long offset) {
* B. Remove all blocks in PendingDataNodeMessages for the failed storage
* when we remove all blocks from BlocksMap for that storage.
*/
@VisibleForTesting
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
@ -354,16 +345,14 @@ void heartbeatCheck() {
}
allAlive = dead == null && failedStorage == null;
if (!allAlive && namesystem.isInStartupSafeMode()) {
return;
}
if (dead != null) {
// acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock();
try {
if (namesystem.isInStartupSafeMode()) {
return;
}
synchronized(this) {
dm.removeDeadDatanode(dead);
}
dm.removeDeadDatanode(dead);
} finally {
namesystem.writeUnlock();
}
@ -372,12 +361,7 @@ void heartbeatCheck() {
// acquire the fsnamesystem lock, and remove blocks on the storage.
namesystem.writeLock();
try {
if (namesystem.isInStartupSafeMode()) {
return;
}
synchronized(this) {
blockManager.removeBlocksAssociatedTo(failedStorage);
}
blockManager.removeBlocksAssociatedTo(failedStorage);
} finally {
namesystem.writeUnlock();
}
@ -385,7 +369,6 @@ void heartbeatCheck() {
}
}
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
@ -404,7 +387,7 @@ public void run() {
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
d.setNeedKeyUpdate(true);
}
}
lastBlockKeyUpdate = now;
@ -414,7 +397,7 @@ public void run() {
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
} catch (InterruptedException ignored) {
}
// avoid declaring nodes dead for another cycle if a GC pause lasts
// longer than the node recheck interval
@ -425,143 +408,4 @@ public void run() {
}
}
}
/** Datanode statistics.
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
private static class Stats {
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
private int nodesInService = 0;
private int nodesInServiceXceiverCount = 0;
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService++;
nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
capacityTotal += node.getDfsUsed();
}
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
Set<StorageType> storageTypes = new HashSet<>();
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
statsMap.addStorage(storageInfo, node);
storageTypes.add(storageInfo.getStorageType());
}
for (StorageType storageType : storageTypes) {
statsMap.addNode(storageType, node);
}
}
private void subtract(final DatanodeDescriptor node) {
capacityUsed -= node.getDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService--;
nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
capacityTotal -= node.getDfsUsed();
}
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
Set<StorageType> storageTypes = new HashSet<>();
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
statsMap.subtractStorage(storageInfo, node);
storageTypes.add(storageInfo.getStorageType());
}
for (StorageType storageType : storageTypes) {
statsMap.subtractNode(storageType, node);
}
}
/** Increment expired heartbeat counter. */
private void incrExpiredHeartbeats() {
expiredHeartbeats++;
}
}
/** StorageType specific statistics.
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
static final class StorageTypeStatsMap {
private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
new IdentityHashMap<>();
private StorageTypeStatsMap() {}
private StorageTypeStatsMap(StorageTypeStatsMap other) {
storageTypeStatsMap =
new IdentityHashMap<>(other.storageTypeStatsMap);
for (Map.Entry<StorageType, StorageTypeStats> entry :
storageTypeStatsMap.entrySet()) {
entry.setValue(new StorageTypeStats(entry.getValue()));
}
}
private Map<StorageType, StorageTypeStats> get() {
return Collections.unmodifiableMap(storageTypeStatsMap);
}
private void addNode(StorageType storageType,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(storageType);
if (storageTypeStats == null) {
storageTypeStats = new StorageTypeStats();
storageTypeStatsMap.put(storageType, storageTypeStats);
}
storageTypeStats.addNode(node);
}
private void addStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(info.getStorageType());
if (storageTypeStats == null) {
storageTypeStats = new StorageTypeStats();
storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
}
storageTypeStats.addStorage(info, node);
}
private void subtractStorage(final DatanodeStorageInfo info,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(info.getStorageType());
if (storageTypeStats != null) {
storageTypeStats.subtractStorage(info, node);
}
}
private void subtractNode(StorageType storageType,
final DatanodeDescriptor node) {
StorageTypeStats storageTypeStats =
storageTypeStatsMap.get(storageType);
if (storageTypeStats != null) {
storageTypeStats.subtractNode(node);
}
}
}
}

View File

@ -86,7 +86,7 @@ void setChosenAsPrimary(boolean chosenAsPrimary) {
* Is data-node the replica belongs to alive.
*/
boolean isAlive() {
return expectedLocation.getDatanodeDescriptor().isAlive;
return expectedLocation.getDatanodeDescriptor().isAlive();
}
@Override // Block

View File

@ -934,7 +934,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
try {
final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID);
if (datanode == null || !datanode.isAlive) {
if (datanode == null || !datanode.isAlive()) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " +
datanode);

View File

@ -1768,7 +1768,7 @@ public Boolean get() {
FSNamesystem namesystem = cluster.getNamesystem();
final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
namesystem, nodeID);
return (dd.isAlive == alive);
return (dd.isAlive() == alive);
}
}, 100, waitTime);
}

View File

@ -595,7 +595,7 @@ public void testFavorDecomUntilHardLimit() throws Exception {
public void testSafeModeIBR() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.isAlive = true;
node.setAlive(true);
DatanodeRegistration nodeReg =
new DatanodeRegistration(node, null, null, "");
@ -640,7 +640,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.isAlive = true;
node.setAlive(true);
DatanodeRegistration nodeReg =
new DatanodeRegistration(node, null, null, "");
@ -672,7 +672,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
DatanodeDescriptor node = nodes.get(0);
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.isAlive = true;
node.setAlive(true);
DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, "");
// register new node

View File

@ -38,7 +38,9 @@ public void testInitializeBlockRecovery() throws Exception {
DatanodeStorageInfo s3 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.3", "s3");
DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
dd1.setAlive(true);
dd2.setAlive(true);
dd3.setAlive(true);
BlockInfoContiguous blockInfo = new BlockInfoContiguous(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,

View File

@ -66,7 +66,7 @@ public void testBlocksCounter() throws Exception {
assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED);
assertEquals(1, dd.numBlocks());
// remove a non-existent block
assertFalse(dd.removeBlock(blk1));
assertFalse(BlocksMap.removeBlock(dd, blk1));
assertEquals(1, dd.numBlocks());
// add an existent block
assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED);
@ -75,10 +75,10 @@ public void testBlocksCounter() throws Exception {
assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED);
assertEquals(2, dd.numBlocks());
// remove first block
assertTrue(dd.removeBlock(blk));
assertTrue(BlocksMap.removeBlock(dd, blk));
assertEquals(1, dd.numBlocks());
// remove second block
assertTrue(dd.removeBlock(blk1));
assertTrue(BlocksMap.removeBlock(dd, blk1));
assertEquals(0, dd.numBlocks());
}
}