diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a0c4698421..a5ee30bc1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1851,7 +1851,7 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block, (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); } - private BlockReconstructionWork scheduleReconstruction(BlockInfo block, + BlockReconstructionWork scheduleReconstruction(BlockInfo block, int priority) { // skip abandoned block or block reopened for append if (block.isDeleted() || !block.isCompleteOrCommitted()) { @@ -1873,6 +1873,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block, if(srcNodes == null || srcNodes.length == 0) { // block can not be reconstructed from any node LOG.debug("Block {} cannot be reconstructed from any node", block); + NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -1885,6 +1886,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block, neededReconstruction.remove(block, priority); blockLog.debug("BLOCK* Removing {} from neededReconstruction as" + " it has enough replicas", block); + NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -1900,6 +1902,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block, if (block.isStriped()) { if (pendingNum > 0) { // Wait the previous reconstruction to finish. + NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -3727,8 +3730,8 @@ private long addBlock(BlockInfo block, List results) { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) - throws IOException { + public void addBlock(DatanodeStorageInfo storageInfo, Block block, + String delHint) throws IOException { DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with @@ -3751,7 +3754,9 @@ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) BlockInfo storedBlock = getStoredBlock(block); if (storedBlock != null && block.getGenerationStamp() == storedBlock.getGenerationStamp()) { - pendingReconstruction.decrement(storedBlock, node); + if (pendingReconstruction.decrement(storedBlock, node)) { + NameNode.getNameNodeMetrics().incSuccessfulReReplications(); + } } processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java index 2221d1d654..0f20daae39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -97,8 +98,10 @@ void increment(BlockInfo block, DatanodeDescriptor... targets) { * for this block. * * @param dn The DataNode that finishes the reconstruction + * @return true if the block is decremented to 0 and got removed. */ - void decrement(BlockInfo block, DatanodeDescriptor dn) { + boolean decrement(BlockInfo block, DatanodeDescriptor dn) { + boolean removed = false; synchronized (pendingReconstructions) { PendingBlockInfo found = pendingReconstructions.get(block); if (found != null) { @@ -106,9 +109,11 @@ void decrement(BlockInfo block, DatanodeDescriptor dn) { found.decrementReplicas(dn); if (found.getNumReplicas() <= 0) { pendingReconstructions.remove(block); + removed = true; } } } + return removed; } /** @@ -263,6 +268,7 @@ void pendingReconstructionCheck() { timedOutItems.add(block); } LOG.warn("PendingReconstructionMonitor timed out " + block); + NameNode.getNameNodeMetrics().incTimeoutReReplications(); iter.remove(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index cb81f5a376..f2534e42d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -58,6 +58,12 @@ public class NameNodeMetrics { @Metric MutableCounterLong createSymlinkOps; @Metric MutableCounterLong getLinkTargetOps; @Metric MutableCounterLong filesInGetListingOps; + @Metric ("Number of successful re-replications") + MutableCounterLong successfulReReplications; + @Metric ("Number of times we failed to schedule a block re-replication.") + MutableCounterLong numTimesReReplicationNotScheduled; + @Metric("Number of timed out block re-replications") + MutableCounterLong timeoutReReplications; @Metric("Number of allowSnapshot operations") MutableCounterLong allowSnapshotOps; @Metric("Number of disallowSnapshot operations") @@ -300,6 +306,18 @@ public void incrTransactionsBatchedInSync(long count) { transactionsBatchedInSync.incr(count); } + public void incSuccessfulReReplications() { + successfulReReplications.incr(); + } + + public void incNumTimesReReplicationNotScheduled() { + numTimesReReplicationNotScheduled.incr(); + } + + public void incTimeoutReReplications() { + timeoutReReplications.incr(); + } + public void addSync(long elapsed) { syncs.add(elapsed); for (MutableQuantiles q : syncsQuantiles) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java index 7679f9ddc8..042eae7033 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -44,6 +48,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.junit.Test; import org.mockito.Mockito; @@ -178,7 +183,7 @@ public void testPendingReconstruction() { public void testProcessPendingReconstructions() throws Exception { final Configuration conf = new HdfsConfiguration(); conf.setLong( - DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT); + DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT); MiniDFSCluster cluster = null; Block block; BlockInfo blockInfo; @@ -418,7 +423,7 @@ public void testPendingAndInvalidate() throws Exception { CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFS_REPLICATION_INTERVAL); - CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + CONF.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, DFS_REPLICATION_INTERVAL); MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes( DATANODE_COUNT).build(); @@ -471,4 +476,81 @@ public void testPendingAndInvalidate() throws Exception { cluster.shutdown(); } } + + @Test + public void testReplicationCounter() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.setInt(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 2); + MiniDFSCluster tmpCluster = new MiniDFSCluster.Builder(conf).numDataNodes( + DATANODE_COUNT).build(); + tmpCluster.waitActive(); + FSNamesystem fsn = tmpCluster.getNamesystem(0); + fsn.writeLock(); + + try { + BlockManager bm = fsn.getBlockManager(); + BlocksMap blocksMap = bm.blocksMap; + + // create three blockInfo below, blockInfo0 will success, blockInfo1 will + // time out, blockInfo2 will fail the replication. + BlockCollection bc0 = Mockito.mock(BlockCollection.class); + BlockInfo blockInfo0 = new BlockInfoContiguous((short) 3); + blockInfo0.setBlockId(0); + + BlockCollection bc1 = Mockito.mock(BlockCollection.class); + BlockInfo blockInfo1 = new BlockInfoContiguous((short) 3); + blockInfo1.setBlockId(1); + + BlockCollection bc2 = Mockito.mock(BlockCollection.class); + Mockito.when(bc2.getId()).thenReturn((2L)); + BlockInfo blockInfo2 = new BlockInfoContiguous((short) 3); + blockInfo2.setBlockId(2); + + blocksMap.addBlockCollection(blockInfo0, bc0); + blocksMap.addBlockCollection(blockInfo1, bc1); + blocksMap.addBlockCollection(blockInfo2, bc2); + + PendingReconstructionBlocks pending = bm.pendingReconstruction; + + MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); + assertCounter("SuccessfulReReplications", 0L, rb); + assertCounter("NumTimesReReplicationNotScheduled", 0L, rb); + assertCounter("TimeoutReReplications", 0L, rb); + + // add block0 and block1 to pending queue. + pending.increment(blockInfo0); + pending.increment(blockInfo1); + + Thread.sleep(2000); + + rb = getMetrics("NameNodeActivity"); + assertCounter("SuccessfulReReplications", 0L, rb); + assertCounter("NumTimesReReplicationNotScheduled", 0L, rb); + assertCounter("TimeoutReReplications", 0L, rb); + + // call addBlock on block0 will make it successfully replicated. + // not callign addBlock on block1 will make it timeout later. + DatanodeStorageInfo[] storageInfos = + DFSTestUtil.createDatanodeStorageInfos(1); + bm.addBlock(storageInfos[0], blockInfo0, null); + + // call schedule replication on blockInfo2 will fail the re-replication. + // because there is no source data to replicate from. + bm.scheduleReconstruction(blockInfo2, 0); + + Thread.sleep(2000); + + rb = getMetrics("NameNodeActivity"); + assertCounter("SuccessfulReReplications", 1L, rb); + assertCounter("NumTimesReReplicationNotScheduled", 1L, rb); + assertCounter("TimeoutReReplications", 1L, rb); + + } finally { + tmpCluster.shutdown(); + fsn.writeUnlock(); + } + } + + }