From be7a0add8b6561d3c566237cc0370b06e7f32bb4 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 14 Oct 2015 16:17:49 -0700 Subject: [PATCH] HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. Contributed by Jing Zhao. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../server/blockmanagement/BlockManager.java | 10 +- .../server/blockmanagement/BlocksMap.java | 14 +- .../blockmanagement/DatanodeDescriptor.java | 169 +++++------- .../blockmanagement/DatanodeManager.java | 6 +- .../server/blockmanagement/DatanodeStats.java | 193 +++++++++++++ .../blockmanagement/DecommissionManager.java | 2 +- .../blockmanagement/HeartbeatManager.java | 254 ++++-------------- .../ReplicaUnderConstruction.java | 2 +- .../hdfs/server/namenode/CacheManager.java | 2 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../blockmanagement/TestBlockManager.java | 6 +- .../TestBlockUnderConstructionFeature.java | 4 +- .../TestDatanodeDescriptor.java | 6 +- 14 files changed, 349 insertions(+), 323 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 127cb0d230..847a2844ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1527,6 +1527,8 @@ Release 2.8.0 - UNRELEASED HDFS-9238. Update TestFileCreation.testLeaseExpireHardLimit() to avoid using DataNodeTestUtils.getFile(). (Tony Wu via lei) + HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. (jing9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index cdf43fb98b..418522049f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1022,9 +1022,9 @@ public void setBlockToken(final LocatedBlock b, void addKeyUpdateCommand(final List cmds, final DatanodeDescriptor nodeinfo) { // check access key update - if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate) { + if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate()) { cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys())); - nodeinfo.needKeyUpdate = false; + nodeinfo.setNeedKeyUpdate(false); } } @@ -1966,7 +1966,7 @@ public boolean processReport(final DatanodeID nodeID, try { node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isAlive) { + if (node == null || !node.isAlive()) { throw new IOException( "ProcessReport from dead or unregistered node: " + nodeID); } @@ -3528,7 +3528,7 @@ public void processIncrementalBlockReport(final DatanodeID nodeID, int deleted = 0; int receiving = 0; final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isAlive) { + if (node == null || !node.isAlive()) { blockLog.warn("BLOCK* processIncrementalBlockReport" + " is received from dead or unregistered node {}", nodeID); throw new IOException( @@ -3678,7 +3678,7 @@ boolean isNodeHealthyForDecommission(DatanodeDescriptor node) { return false; } - if (node.isAlive) { + if (node.isAlive()) { return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index f41b093fcd..ed05e3a21f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -131,7 +131,7 @@ void removeBlock(Block block) { for(int idx = size - 1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); if (dn != null) { - dn.removeBlock(blockInfo); // remove from the list and wipe the location + removeBlock(dn, blockInfo); // remove from the list and wipe the location } } } @@ -195,7 +195,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) { return false; // remove block from the data-node list and the node from the block info - boolean removed = node.removeBlock(info); + boolean removed = removeBlock(node, info); if (info.hasNoStorage() // no datanodes left && info.isDeleted()) { // does not belong to a file @@ -204,6 +204,16 @@ boolean removeNode(Block b, DatanodeDescriptor node) { return removed; } + /** + * Remove block from the list of blocks belonging to the data-node. Remove + * data-node from the block. + */ + static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) { + final DatanodeStorageInfo s = b.findStorageInfo(dn); + // if block exists on this datanode + return s != null && s.removeBlock(b); + } + int size() { if (blocks != null) { return blocks.size(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index fde645ebb2..e5563eb5cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -67,29 +67,9 @@ public class DatanodeDescriptor extends DatanodeInfo { public static final Logger LOG = LoggerFactory.getLogger(DatanodeDescriptor.class); public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; - - // Stores status of decommissioning. - // If node is not decommissioning, do not use this object for anything. - public final DecommissioningStatus decommissioningStatus = - new DecommissioningStatus(); - - private long curBlockReportId = 0; - - private BitSet curBlockReportRpcsSeen = null; - - public int updateBlockReportContext(BlockReportContext context) { - if (curBlockReportId != context.getReportId()) { - curBlockReportId = context.getReportId(); - curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); - } - curBlockReportRpcsSeen.set(context.getCurRpc()); - return curBlockReportRpcsSeen.cardinality(); - } - - public void clearBlockReportContext() { - curBlockReportId = 0; - curBlockReportRpcsSeen = null; - } + private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min + private static final List EMPTY_STORAGE_INFO_LIST = + ImmutableList.of(); /** Block and targets pair */ @InterfaceAudience.Private @@ -106,7 +86,7 @@ public static class BlockTargetPair { /** A BlockTargetPair queue. */ private static class BlockQueue { - private final Queue blockq = new LinkedList(); + private final Queue blockq = new LinkedList<>(); /** Size of the queue */ synchronized int size() {return blockq.size();} @@ -132,7 +112,7 @@ synchronized List poll(int numBlocks) { /** * Returns true if the queue contains the specified element. */ - boolean contains(E e) { + synchronized boolean contains(E e) { return blockq.contains(e); } @@ -141,9 +121,6 @@ synchronized void clear() { } } - private final Map storageMap = - new HashMap<>(); - /** * A list of CachedBlock objects on this datanode. */ @@ -172,6 +149,18 @@ public Type getType() { } } + // Stores status of decommissioning. + // If node is not decommissioning, do not use this object for anything. + public final DecommissioningStatus decommissioningStatus = + new DecommissioningStatus(); + + private long curBlockReportId = 0; + + private BitSet curBlockReportRpcsSeen = null; + + private final Map storageMap = + new HashMap<>(); + /** * The blocks which we want to cache on this DataNode. */ @@ -191,18 +180,6 @@ public Type getType() { private final CachedBlocksList pendingUncached = new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED); - public CachedBlocksList getPendingCached() { - return pendingCached; - } - - public CachedBlocksList getCached() { - return cached; - } - - public CachedBlocksList getPendingUncached() { - return pendingUncached; - } - /** * The time when the last batch of caching directives was sent, in * monotonic milliseconds. @@ -211,9 +188,8 @@ public CachedBlocksList getPendingUncached() { // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist - public boolean isAlive = false; - public boolean needKeyUpdate = false; - + private boolean isAlive = false; + private boolean needKeyUpdate = false; // A system administrator can tune the balancer bandwidth parameter // (dfs.balance.bandwidthPerSec) dynamically by calling @@ -245,7 +221,6 @@ public CachedBlocksList getPendingUncached() { private EnumCounters prevApproxBlocksScheduled = new EnumCounters<>(StorageType.class); private long lastBlocksScheduledRollTime = 0; - private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private int volumeFailures = 0; private VolumeFailureSummary volumeFailureSummary = null; @@ -281,6 +256,48 @@ public DatanodeDescriptor(DatanodeID nodeID, updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); } + public int updateBlockReportContext(BlockReportContext context) { + if (curBlockReportId != context.getReportId()) { + curBlockReportId = context.getReportId(); + curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); + } + curBlockReportRpcsSeen.set(context.getCurRpc()); + return curBlockReportRpcsSeen.cardinality(); + } + + public void clearBlockReportContext() { + curBlockReportId = 0; + curBlockReportRpcsSeen = null; + } + + public CachedBlocksList getPendingCached() { + return pendingCached; + } + + public CachedBlocksList getCached() { + return cached; + } + + public CachedBlocksList getPendingUncached() { + return pendingUncached; + } + + public boolean isAlive() { + return isAlive; + } + + public void setAlive(boolean isAlive) { + this.isAlive = isAlive; + } + + public boolean needKeyUpdate() { + return needKeyUpdate; + } + + public void setNeedKeyUpdate(boolean needKeyUpdate) { + this.needKeyUpdate = needKeyUpdate; + } + @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { @@ -316,9 +333,6 @@ boolean hasStaleStorages() { } } - static final private List EMPTY_STORAGE_INFO_LIST = - ImmutableList.of(); - List removeZombieStorages() { List zombies = null; synchronized (storageMap) { @@ -344,28 +358,6 @@ List removeZombieStorages() { return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; } - /** - * Remove block from the list of blocks belonging to the data-node. Remove - * data-node from the block. - */ - boolean removeBlock(BlockInfo b) { - final DatanodeStorageInfo s = b.findStorageInfo(this); - // if block exists on this datanode - if (s != null) { - return s.removeBlock(b); - } - return false; - } - - /** - * Remove block from the list of blocks belonging to the data-node. Remove - * data-node from the block. - */ - boolean removeBlock(String storageID, BlockInfo b) { - DatanodeStorageInfo s = getStorageInfo(storageID); - return s != null && s.removeBlock(b); - } - public void resetBlocks() { setCapacity(0); setRemaining(0); @@ -384,10 +376,10 @@ public void resetBlocks() { public void clearBlockQueues() { synchronized (invalidateBlocks) { this.invalidateBlocks.clear(); - this.recoverBlocks.clear(); - this.replicateBlocks.clear(); - this.erasurecodeBlocks.clear(); } + this.recoverBlocks.clear(); + this.replicateBlocks.clear(); + this.erasurecodeBlocks.clear(); // pendingCached, cached, and pendingUncached are protected by the // FSN lock. this.pendingCached.clear(); @@ -589,10 +581,6 @@ Iterator getBlockIterator() { return new BlockIterator(getStorageInfos()); } - Iterator getBlockIterator(final String storageID) { - return new BlockIterator(getStorageInfo(storageID)); - } - void incrementPendingReplicationWithoutTargets() { pendingReplicationWithoutTargets++; } @@ -662,16 +650,6 @@ public int getNumberOfBlocksToBeErasureCoded() { return erasurecodeBlocks.size(); } - /** - * The number of block invalidation items that are pending to - * be sent to the datanode - */ - int getNumberOfBlocksToBeInvalidated() { - synchronized (invalidateBlocks) { - return invalidateBlocks.size(); - } - } - public List getReplicationCommand(int maxTransfers) { return replicateBlocks.poll(maxTransfers); } @@ -714,7 +692,6 @@ public boolean containsInvalidateBlock(Block block) { * * @param t requested storage type * @param blockSize requested block size - * @return */ public DatanodeStorageInfo chooseStorage4Block(StorageType t, long blockSize) { @@ -724,8 +701,7 @@ public DatanodeStorageInfo chooseStorage4Block(StorageType t, long remaining = 0; DatanodeStorageInfo storage = null; for (DatanodeStorageInfo s : getStorageInfos()) { - if (s.getState() == State.NORMAL && - s.getStorageType() == t) { + if (s.getState() == State.NORMAL && s.getStorageType() == t) { if (storage == null) { storage = s; } @@ -761,7 +737,7 @@ public int getBlocksScheduled() { /** Increment the number of blocks scheduled. */ void incrementBlocksScheduled(StorageType t) { - currApproxBlocksScheduled.add(t, 1);; + currApproxBlocksScheduled.add(t, 1); } /** Decrement the number of blocks scheduled. */ @@ -805,7 +781,7 @@ public class DecommissioningStatus { synchronized void set(int underRep, int onlyRep, int underConstruction) { - if (isDecommissionInProgress() == false) { + if (!isDecommissionInProgress()) { return; } underReplicatedBlocks = underRep; @@ -815,21 +791,21 @@ synchronized void set(int underRep, /** @return the number of under-replicated blocks */ public synchronized int getUnderReplicatedBlocks() { - if (isDecommissionInProgress() == false) { + if (!isDecommissionInProgress()) { return 0; } return underReplicatedBlocks; } /** @return the number of decommission-only replicas */ public synchronized int getDecommissionOnlyReplicas() { - if (isDecommissionInProgress() == false) { + if (!isDecommissionInProgress()) { return 0; } return decommissionOnlyReplicas; } /** @return the number of under-replicated blocks in open files */ public synchronized int getUnderReplicatedInOpenFiles() { - if (isDecommissionInProgress() == false) { + if (!isDecommissionInProgress()) { return 0; } return underReplicatedInOpenFiles; @@ -840,7 +816,7 @@ public synchronized void setStartTime(long time) { } /** @return start time */ public synchronized long getStartTime() { - if (isDecommissionInProgress() == false) { + if (!isDecommissionInProgress()) { return 0; } return startTime; @@ -962,8 +938,7 @@ public void setLastCachingDirectiveSentTimeMs(long time) { } /** - * checks whether atleast first block report has been received - * @return + * @return whether at least first block report has been received */ public boolean checkBlockReportReceived() { if(this.getStorageInfos().length == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 28ab7161c4..2436001c6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -309,7 +309,7 @@ private static long getStaleIntervalFromConf(Configuration conf, void activate(final Configuration conf) { decomManager.activate(conf); - heartbeatManager.activate(conf); + heartbeatManager.activate(); } void close() { @@ -659,7 +659,7 @@ private void decrementVersionCount(String version) { } private boolean shouldCountVersion(DatanodeDescriptor node) { - return node.getSoftwareVersion() != null && node.isAlive && + return node.getSoftwareVersion() != null && node.isAlive() && !isDatanodeDead(node); } @@ -1343,7 +1343,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, throw new DisallowedDatanodeException(nodeinfo); } - if (nodeinfo == null || !nodeinfo.isAlive) { + if (nodeinfo == null || !nodeinfo.isAlive()) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java new file mode 100644 index 0000000000..b070e68e34 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java @@ -0,0 +1,193 @@ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtilClient; + +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Datanode statistics. + * For decommissioning/decommissioned nodes, only used capacity is counted. + */ +class DatanodeStats { + + private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap(); + private long capacityTotal = 0L; + private long capacityUsed = 0L; + private long capacityRemaining = 0L; + private long blockPoolUsed = 0L; + private int xceiverCount = 0; + private long cacheCapacity = 0L; + private long cacheUsed = 0L; + + private int nodesInService = 0; + private int nodesInServiceXceiverCount = 0; + private int expiredHeartbeats = 0; + + synchronized void add(final DatanodeDescriptor node) { + capacityUsed += node.getDfsUsed(); + blockPoolUsed += node.getBlockPoolUsed(); + xceiverCount += node.getXceiverCount(); + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService++; + nodesInServiceXceiverCount += node.getXceiverCount(); + capacityTotal += node.getCapacity(); + capacityRemaining += node.getRemaining(); + } else { + capacityTotal += node.getDfsUsed(); + } + cacheCapacity += node.getCacheCapacity(); + cacheUsed += node.getCacheUsed(); + Set storageTypes = new HashSet<>(); + for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) { + statsMap.addStorage(storageInfo, node); + storageTypes.add(storageInfo.getStorageType()); + } + for (StorageType storageType : storageTypes) { + statsMap.addNode(storageType, node); + } + } + + synchronized void subtract(final DatanodeDescriptor node) { + capacityUsed -= node.getDfsUsed(); + blockPoolUsed -= node.getBlockPoolUsed(); + xceiverCount -= node.getXceiverCount(); + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService--; + nodesInServiceXceiverCount -= node.getXceiverCount(); + capacityTotal -= node.getCapacity(); + capacityRemaining -= node.getRemaining(); + } else { + capacityTotal -= node.getDfsUsed(); + } + cacheCapacity -= node.getCacheCapacity(); + cacheUsed -= node.getCacheUsed(); + Set storageTypes = new HashSet<>(); + for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) { + statsMap.subtractStorage(storageInfo, node); + storageTypes.add(storageInfo.getStorageType()); + } + for (StorageType storageType : storageTypes) { + statsMap.subtractNode(storageType, node); + } + } + + /** Increment expired heartbeat counter. */ + void incrExpiredHeartbeats() { + expiredHeartbeats++; + } + + synchronized Map getStatsMap() { + return statsMap.get(); + } + + synchronized long getCapacityTotal() { + return capacityTotal; + } + + synchronized long getCapacityUsed() { + return capacityUsed; + } + + synchronized long getCapacityRemaining() { + return capacityRemaining; + } + + synchronized long getBlockPoolUsed() { + return blockPoolUsed; + } + + synchronized int getXceiverCount() { + return xceiverCount; + } + + synchronized long getCacheCapacity() { + return cacheCapacity; + } + + synchronized long getCacheUsed() { + return cacheUsed; + } + + synchronized int getNodesInService() { + return nodesInService; + } + + synchronized int getNodesInServiceXceiverCount() { + return nodesInServiceXceiverCount; + } + + synchronized int getExpiredHeartbeats() { + return expiredHeartbeats; + } + + synchronized float getCapacityRemainingPercent() { + return DFSUtilClient.getPercentRemaining(capacityRemaining, capacityTotal); + } + + synchronized float getPercentBlockPoolUsed() { + return DFSUtilClient.getPercentUsed(blockPoolUsed, capacityTotal); + } + + synchronized long getCapacityUsedNonDFS() { + final long nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed; + return nonDFSUsed < 0L? 0L : nonDFSUsed; + } + + synchronized float getCapacityUsedPercent() { + return DFSUtilClient.getPercentUsed(capacityUsed, capacityTotal); + } + + static final class StorageTypeStatsMap { + + private Map storageTypeStatsMap = + new EnumMap<>(StorageType.class); + + private Map get() { + return new EnumMap<>(storageTypeStatsMap); + } + + private void addNode(StorageType storageType, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(storageType); + if (storageTypeStats == null) { + storageTypeStats = new StorageTypeStats(); + storageTypeStatsMap.put(storageType, storageTypeStats); + } + storageTypeStats.addNode(node); + } + + private void addStorage(final DatanodeStorageInfo info, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(info.getStorageType()); + if (storageTypeStats == null) { + storageTypeStats = new StorageTypeStats(); + storageTypeStatsMap.put(info.getStorageType(), storageTypeStats); + } + storageTypeStats.addStorage(info, node); + } + + private void subtractStorage(final DatanodeStorageInfo info, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(info.getStorageType()); + if (storageTypeStats != null) { + storageTypeStats.subtractStorage(info, node); + } + } + + private void subtractNode(StorageType storageType, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(storageType); + if (storageTypeStats != null) { + storageTypeStats.subtractNode(node); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index fb86ff3e9f..1f1ae091a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -216,7 +216,7 @@ public void stopDecommission(DatanodeDescriptor node) { hbManager.stopDecommission(node); // Over-replicated blocks will be detected and processed when // the dead node comes back and send in its full block report. - if (node.isAlive) { + if (node.isAlive()) { blockManager.processOverReplicatedBlocksOnReCommission(node); } // Remove from tracking in DecommissionManager diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index f2e9827b8c..7d34d4f8ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -18,18 +18,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -57,10 +52,10 @@ class HeartbeatManager implements DatanodeStatistics { * and removes them from the list. * It is synchronized by the heartbeat manager lock. */ - private final List datanodes = new ArrayList(); + private final List datanodes = new ArrayList<>(); /** Statistics, which are synchronized by the heartbeat manager lock. */ - private final Stats stats = new Stats(); + private final DatanodeStats stats = new DatanodeStats(); /** The time period to check for expired datanodes */ private final long heartbeatRecheckInterval; @@ -96,7 +91,7 @@ class HeartbeatManager implements DatanodeStatistics { } } - void activate(Configuration conf) { + void activate() { heartbeatThread.start(); } @@ -105,7 +100,7 @@ void close() { try { // This will no effect if the thread hasn't yet been started. heartbeatThread.join(3000); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } @@ -114,74 +109,69 @@ synchronized int getLiveDatanodeCount() { } @Override - public synchronized long getCapacityTotal() { - return stats.capacityTotal; + public long getCapacityTotal() { + return stats.getCapacityTotal(); } @Override - public synchronized long getCapacityUsed() { - return stats.capacityUsed; + public long getCapacityUsed() { + return stats.getCapacityUsed(); } @Override - public synchronized float getCapacityUsedPercent() { - return DFSUtilClient.getPercentUsed(stats.capacityUsed, stats.capacityTotal); + public float getCapacityUsedPercent() { + return stats.getCapacityUsedPercent(); } @Override - public synchronized long getCapacityRemaining() { - return stats.capacityRemaining; + public long getCapacityRemaining() { + return stats.getCapacityRemaining(); } @Override - public synchronized float getCapacityRemainingPercent() { - return DFSUtilClient.getPercentRemaining(stats.capacityRemaining, - stats.capacityTotal); + public float getCapacityRemainingPercent() { + return stats.getCapacityRemainingPercent(); } @Override - public synchronized long getBlockPoolUsed() { - return stats.blockPoolUsed; + public long getBlockPoolUsed() { + return stats.getBlockPoolUsed(); } @Override - public synchronized float getPercentBlockPoolUsed() { - return DFSUtilClient.getPercentUsed(stats.blockPoolUsed, - stats.capacityTotal); + public float getPercentBlockPoolUsed() { + return stats.getPercentBlockPoolUsed(); } @Override - public synchronized long getCapacityUsedNonDFS() { - final long nonDFSUsed = stats.capacityTotal - - stats.capacityRemaining - stats.capacityUsed; - return nonDFSUsed < 0L? 0L : nonDFSUsed; + public long getCapacityUsedNonDFS() { + return stats.getCapacityUsedNonDFS(); } @Override - public synchronized int getXceiverCount() { - return stats.xceiverCount; + public int getXceiverCount() { + return stats.getXceiverCount(); } @Override - public synchronized int getInServiceXceiverCount() { - return stats.nodesInServiceXceiverCount; + public int getInServiceXceiverCount() { + return stats.getNodesInServiceXceiverCount(); } @Override - public synchronized int getNumDatanodesInService() { - return stats.nodesInService; + public int getNumDatanodesInService() { + return stats.getNodesInService(); } @Override - public synchronized long getCacheCapacity() { - return stats.cacheCapacity; + public long getCacheCapacity() { + return stats.getCacheCapacity(); } @Override - public synchronized long getCacheUsed() { - return stats.cacheUsed; + public long getCacheUsed() { + return stats.getCacheUsed(); } - @Override public synchronized long[] getStats() { @@ -195,17 +185,17 @@ public synchronized long[] getStats() { } @Override - public synchronized int getExpiredHeartbeats() { - return stats.expiredHeartbeats; + public int getExpiredHeartbeats() { + return stats.getExpiredHeartbeats(); } @Override - public Map getStorageTypeStats() { - return stats.statsMap.get(); + public Map getStorageTypeStats() { + return stats.getStatsMap(); } synchronized void register(final DatanodeDescriptor d) { - if (!d.isAlive) { + if (!d.isAlive()) { addDatanode(d); //update its timestamp @@ -221,14 +211,14 @@ synchronized void addDatanode(final DatanodeDescriptor d) { // update in-service node count stats.add(d); datanodes.add(d); - d.isAlive = true; + d.setAlive(true); } synchronized void removeDatanode(DatanodeDescriptor node) { - if (node.isAlive) { + if (node.isAlive()) { stats.subtract(node); datanodes.remove(node); - node.isAlive = false; + node.setAlive(false); } } @@ -243,7 +233,7 @@ synchronized void updateHeartbeat(final DatanodeDescriptor node, } synchronized void startDecommission(final DatanodeDescriptor node) { - if (!node.isAlive) { + if (!node.isAlive()) { LOG.info("Dead node {} is decommissioned immediately.", node); node.setDecommissioned(); } else { @@ -255,8 +245,8 @@ synchronized void startDecommission(final DatanodeDescriptor node) { synchronized void stopDecommission(final DatanodeDescriptor node) { LOG.info("Stopping decommissioning of {} node {}", - node.isAlive ? "live" : "dead", node); - if (!node.isAlive) { + node.isAlive() ? "live" : "dead", node); + if (!node.isAlive()) { node.stopDecommission(); } else { stats.subtract(node); @@ -302,6 +292,7 @@ boolean shouldAbortHeartbeatCheck(long offset) { * B. Remove all blocks in PendingDataNodeMessages for the failed storage * when we remove all blocks from BlocksMap for that storage. */ + @VisibleForTesting void heartbeatCheck() { final DatanodeManager dm = blockManager.getDatanodeManager(); // It's OK to check safe mode w/o taking the lock here, we re-check @@ -354,16 +345,14 @@ void heartbeatCheck() { } allAlive = dead == null && failedStorage == null; + if (!allAlive && namesystem.isInStartupSafeMode()) { + return; + } if (dead != null) { // acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock(); try { - if (namesystem.isInStartupSafeMode()) { - return; - } - synchronized(this) { - dm.removeDeadDatanode(dead); - } + dm.removeDeadDatanode(dead); } finally { namesystem.writeUnlock(); } @@ -372,12 +361,7 @@ void heartbeatCheck() { // acquire the fsnamesystem lock, and remove blocks on the storage. namesystem.writeLock(); try { - if (namesystem.isInStartupSafeMode()) { - return; - } - synchronized(this) { - blockManager.removeBlocksAssociatedTo(failedStorage); - } + blockManager.removeBlocksAssociatedTo(failedStorage); } finally { namesystem.writeUnlock(); } @@ -385,7 +369,6 @@ void heartbeatCheck() { } } - /** Periodically check heartbeat and update block key */ private class Monitor implements Runnable { private long lastHeartbeatCheck; @@ -404,7 +387,7 @@ public void run() { if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) { synchronized(HeartbeatManager.this) { for(DatanodeDescriptor d : datanodes) { - d.needKeyUpdate = true; + d.setNeedKeyUpdate(true); } } lastBlockKeyUpdate = now; @@ -414,7 +397,7 @@ public void run() { } try { Thread.sleep(5000); // 5 seconds - } catch (InterruptedException ie) { + } catch (InterruptedException ignored) { } // avoid declaring nodes dead for another cycle if a GC pause lasts // longer than the node recheck interval @@ -425,143 +408,4 @@ public void run() { } } } - - /** Datanode statistics. - * For decommissioning/decommissioned nodes, only used capacity is counted. - */ - private static class Stats { - - private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap(); - - private long capacityTotal = 0L; - private long capacityUsed = 0L; - private long capacityRemaining = 0L; - private long blockPoolUsed = 0L; - private int xceiverCount = 0; - private long cacheCapacity = 0L; - private long cacheUsed = 0L; - - private int nodesInService = 0; - private int nodesInServiceXceiverCount = 0; - - private int expiredHeartbeats = 0; - - private void add(final DatanodeDescriptor node) { - capacityUsed += node.getDfsUsed(); - blockPoolUsed += node.getBlockPoolUsed(); - xceiverCount += node.getXceiverCount(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { - nodesInService++; - nodesInServiceXceiverCount += node.getXceiverCount(); - capacityTotal += node.getCapacity(); - capacityRemaining += node.getRemaining(); - } else { - capacityTotal += node.getDfsUsed(); - } - cacheCapacity += node.getCacheCapacity(); - cacheUsed += node.getCacheUsed(); - Set storageTypes = new HashSet<>(); - for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) { - statsMap.addStorage(storageInfo, node); - storageTypes.add(storageInfo.getStorageType()); - } - for (StorageType storageType : storageTypes) { - statsMap.addNode(storageType, node); - } - } - - private void subtract(final DatanodeDescriptor node) { - capacityUsed -= node.getDfsUsed(); - blockPoolUsed -= node.getBlockPoolUsed(); - xceiverCount -= node.getXceiverCount(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { - nodesInService--; - nodesInServiceXceiverCount -= node.getXceiverCount(); - capacityTotal -= node.getCapacity(); - capacityRemaining -= node.getRemaining(); - } else { - capacityTotal -= node.getDfsUsed(); - } - cacheCapacity -= node.getCacheCapacity(); - cacheUsed -= node.getCacheUsed(); - Set storageTypes = new HashSet<>(); - for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) { - statsMap.subtractStorage(storageInfo, node); - storageTypes.add(storageInfo.getStorageType()); - } - for (StorageType storageType : storageTypes) { - statsMap.subtractNode(storageType, node); - } - } - - /** Increment expired heartbeat counter. */ - private void incrExpiredHeartbeats() { - expiredHeartbeats++; - } - } - - /** StorageType specific statistics. - * For decommissioning/decommissioned nodes, only used capacity is counted. - */ - - static final class StorageTypeStatsMap { - - private Map storageTypeStatsMap = - new IdentityHashMap<>(); - - private StorageTypeStatsMap() {} - - private StorageTypeStatsMap(StorageTypeStatsMap other) { - storageTypeStatsMap = - new IdentityHashMap<>(other.storageTypeStatsMap); - for (Map.Entry entry : - storageTypeStatsMap.entrySet()) { - entry.setValue(new StorageTypeStats(entry.getValue())); - } - } - - private Map get() { - return Collections.unmodifiableMap(storageTypeStatsMap); - } - - private void addNode(StorageType storageType, - final DatanodeDescriptor node) { - StorageTypeStats storageTypeStats = - storageTypeStatsMap.get(storageType); - if (storageTypeStats == null) { - storageTypeStats = new StorageTypeStats(); - storageTypeStatsMap.put(storageType, storageTypeStats); - } - storageTypeStats.addNode(node); - } - - private void addStorage(final DatanodeStorageInfo info, - final DatanodeDescriptor node) { - StorageTypeStats storageTypeStats = - storageTypeStatsMap.get(info.getStorageType()); - if (storageTypeStats == null) { - storageTypeStats = new StorageTypeStats(); - storageTypeStatsMap.put(info.getStorageType(), storageTypeStats); - } - storageTypeStats.addStorage(info, node); - } - - private void subtractStorage(final DatanodeStorageInfo info, - final DatanodeDescriptor node) { - StorageTypeStats storageTypeStats = - storageTypeStatsMap.get(info.getStorageType()); - if (storageTypeStats != null) { - storageTypeStats.subtractStorage(info, node); - } - } - - private void subtractNode(StorageType storageType, - final DatanodeDescriptor node) { - StorageTypeStats storageTypeStats = - storageTypeStatsMap.get(storageType); - if (storageTypeStats != null) { - storageTypeStats.subtractNode(node); - } - } - } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java index f4600cb74f..873d19529a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java @@ -86,7 +86,7 @@ void setChosenAsPrimary(boolean chosenAsPrimary) { * Is data-node the replica belongs to alive. */ boolean isAlive() { - return expectedLocation.getDatanodeDescriptor().isAlive; + return expectedLocation.getDatanodeDescriptor().isAlive(); } @Override // Block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 3559065d70..4fd9ca84fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -934,7 +934,7 @@ public final void processCacheReport(final DatanodeID datanodeID, try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); - if (datanode == null || !datanode.isAlive) { + if (datanode == null || !datanode.isAlive()) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 155abd01c6..86b89a2c0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1768,7 +1768,7 @@ public Boolean get() { FSNamesystem namesystem = cluster.getNamesystem(); final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode( namesystem, nodeID); - return (dd.isAlive == alive); + return (dd.isAlive() == alive); } }, 100, waitTime); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 094794b36d..35ccf91793 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -595,7 +595,7 @@ public void testFavorDecomUntilHardLimit() throws Exception { public void testSafeModeIBR() throws Exception { DatanodeDescriptor node = spy(nodes.get(0)); DatanodeStorageInfo ds = node.getStorageInfos()[0]; - node.isAlive = true; + node.setAlive(true); DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, ""); @@ -640,7 +640,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception { DatanodeDescriptor node = spy(nodes.get(0)); DatanodeStorageInfo ds = node.getStorageInfos()[0]; - node.isAlive = true; + node.setAlive(true); DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, ""); @@ -672,7 +672,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception { DatanodeDescriptor node = nodes.get(0); DatanodeStorageInfo ds = node.getStorageInfos()[0]; - node.isAlive = true; + node.setAlive(true); DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, ""); // register new node diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockUnderConstructionFeature.java index b47aac0824..717292e6e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockUnderConstructionFeature.java @@ -38,7 +38,9 @@ public void testInitializeBlockRecovery() throws Exception { DatanodeStorageInfo s3 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.3", "s3"); DatanodeDescriptor dd3 = s3.getDatanodeDescriptor(); - dd1.isAlive = dd2.isAlive = dd3.isAlive = true; + dd1.setAlive(true); + dd2.setAlive(true); + dd3.setAlive(true); BlockInfoContiguous blockInfo = new BlockInfoContiguous( new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3); blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java index ca27bb7bb0..9580bae3ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java @@ -66,7 +66,7 @@ public void testBlocksCounter() throws Exception { assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED); assertEquals(1, dd.numBlocks()); // remove a non-existent block - assertFalse(dd.removeBlock(blk1)); + assertFalse(BlocksMap.removeBlock(dd, blk1)); assertEquals(1, dd.numBlocks()); // add an existent block assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED); @@ -75,10 +75,10 @@ public void testBlocksCounter() throws Exception { assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED); assertEquals(2, dd.numBlocks()); // remove first block - assertTrue(dd.removeBlock(blk)); + assertTrue(BlocksMap.removeBlock(dd, blk)); assertEquals(1, dd.numBlocks()); // remove second block - assertTrue(dd.removeBlock(blk1)); + assertTrue(BlocksMap.removeBlock(dd, blk1)); assertEquals(0, dd.numBlocks()); } }