HDFS-12043. Add counters for block re-replication. Contributed by Chen Liang.
This commit is contained in:
parent
72993b33b7
commit
900221f95e
@ -1851,7 +1851,7 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block,
|
|||||||
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
|
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
||||||
int priority) {
|
int priority) {
|
||||||
// skip abandoned block or block reopened for append
|
// skip abandoned block or block reopened for append
|
||||||
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
||||||
@ -1873,6 +1873,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
|||||||
if(srcNodes == null || srcNodes.length == 0) {
|
if(srcNodes == null || srcNodes.length == 0) {
|
||||||
// block can not be reconstructed from any node
|
// block can not be reconstructed from any node
|
||||||
LOG.debug("Block {} cannot be reconstructed from any node", block);
|
LOG.debug("Block {} cannot be reconstructed from any node", block);
|
||||||
|
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1885,6 +1886,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
|||||||
neededReconstruction.remove(block, priority);
|
neededReconstruction.remove(block, priority);
|
||||||
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
||||||
" it has enough replicas", block);
|
" it has enough replicas", block);
|
||||||
|
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1900,6 +1902,7 @@ private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
|||||||
if (block.isStriped()) {
|
if (block.isStriped()) {
|
||||||
if (pendingNum > 0) {
|
if (pendingNum > 0) {
|
||||||
// Wait the previous reconstruction to finish.
|
// Wait the previous reconstruction to finish.
|
||||||
|
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3727,8 +3730,8 @@ private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
|
|||||||
* The given node is reporting that it received a certain block.
|
* The given node is reporting that it received a certain block.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
|
public void addBlock(DatanodeStorageInfo storageInfo, Block block,
|
||||||
throws IOException {
|
String delHint) throws IOException {
|
||||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||||
// Decrement number of blocks scheduled to this datanode.
|
// Decrement number of blocks scheduled to this datanode.
|
||||||
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
||||||
@ -3751,7 +3754,9 @@ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
|
|||||||
BlockInfo storedBlock = getStoredBlock(block);
|
BlockInfo storedBlock = getStoredBlock(block);
|
||||||
if (storedBlock != null &&
|
if (storedBlock != null &&
|
||||||
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
|
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
|
||||||
pendingReconstruction.decrement(storedBlock, node);
|
if (pendingReconstruction.decrement(storedBlock, node)) {
|
||||||
|
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
||||||
delHintNode);
|
delHintNode);
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
@ -97,8 +98,10 @@ void increment(BlockInfo block, DatanodeDescriptor... targets) {
|
|||||||
* for this block.
|
* for this block.
|
||||||
*
|
*
|
||||||
* @param dn The DataNode that finishes the reconstruction
|
* @param dn The DataNode that finishes the reconstruction
|
||||||
|
* @return true if the block is decremented to 0 and got removed.
|
||||||
*/
|
*/
|
||||||
void decrement(BlockInfo block, DatanodeDescriptor dn) {
|
boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
|
||||||
|
boolean removed = false;
|
||||||
synchronized (pendingReconstructions) {
|
synchronized (pendingReconstructions) {
|
||||||
PendingBlockInfo found = pendingReconstructions.get(block);
|
PendingBlockInfo found = pendingReconstructions.get(block);
|
||||||
if (found != null) {
|
if (found != null) {
|
||||||
@ -106,9 +109,11 @@ void decrement(BlockInfo block, DatanodeDescriptor dn) {
|
|||||||
found.decrementReplicas(dn);
|
found.decrementReplicas(dn);
|
||||||
if (found.getNumReplicas() <= 0) {
|
if (found.getNumReplicas() <= 0) {
|
||||||
pendingReconstructions.remove(block);
|
pendingReconstructions.remove(block);
|
||||||
|
removed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -263,6 +268,7 @@ void pendingReconstructionCheck() {
|
|||||||
timedOutItems.add(block);
|
timedOutItems.add(block);
|
||||||
}
|
}
|
||||||
LOG.warn("PendingReconstructionMonitor timed out " + block);
|
LOG.warn("PendingReconstructionMonitor timed out " + block);
|
||||||
|
NameNode.getNameNodeMetrics().incTimeoutReReplications();
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,6 +58,12 @@ public class NameNodeMetrics {
|
|||||||
@Metric MutableCounterLong createSymlinkOps;
|
@Metric MutableCounterLong createSymlinkOps;
|
||||||
@Metric MutableCounterLong getLinkTargetOps;
|
@Metric MutableCounterLong getLinkTargetOps;
|
||||||
@Metric MutableCounterLong filesInGetListingOps;
|
@Metric MutableCounterLong filesInGetListingOps;
|
||||||
|
@Metric ("Number of successful re-replications")
|
||||||
|
MutableCounterLong successfulReReplications;
|
||||||
|
@Metric ("Number of times we failed to schedule a block re-replication.")
|
||||||
|
MutableCounterLong numTimesReReplicationNotScheduled;
|
||||||
|
@Metric("Number of timed out block re-replications")
|
||||||
|
MutableCounterLong timeoutReReplications;
|
||||||
@Metric("Number of allowSnapshot operations")
|
@Metric("Number of allowSnapshot operations")
|
||||||
MutableCounterLong allowSnapshotOps;
|
MutableCounterLong allowSnapshotOps;
|
||||||
@Metric("Number of disallowSnapshot operations")
|
@Metric("Number of disallowSnapshot operations")
|
||||||
@ -300,6 +306,18 @@ public void incrTransactionsBatchedInSync(long count) {
|
|||||||
transactionsBatchedInSync.incr(count);
|
transactionsBatchedInSync.incr(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incSuccessfulReReplications() {
|
||||||
|
successfulReReplications.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumTimesReReplicationNotScheduled() {
|
||||||
|
numTimesReReplicationNotScheduled.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incTimeoutReReplications() {
|
||||||
|
timeoutReReplications.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void addSync(long elapsed) {
|
public void addSync(long elapsed) {
|
||||||
syncs.add(elapsed);
|
syncs.add(elapsed);
|
||||||
for (MutableQuantiles q : syncsQuantiles) {
|
for (MutableQuantiles q : syncsQuantiles) {
|
||||||
|
@ -17,6 +17,10 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
@ -44,6 +48,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@ -178,7 +183,7 @@ public void testPendingReconstruction() {
|
|||||||
public void testProcessPendingReconstructions() throws Exception {
|
public void testProcessPendingReconstructions() throws Exception {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
conf.setLong(
|
conf.setLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
|
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
Block block;
|
Block block;
|
||||||
BlockInfo blockInfo;
|
BlockInfo blockInfo;
|
||||||
@ -418,7 +423,7 @@ public void testPendingAndInvalidate() throws Exception {
|
|||||||
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||||
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
DFS_REPLICATION_INTERVAL);
|
DFS_REPLICATION_INTERVAL);
|
||||||
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
CONF.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
||||||
DFS_REPLICATION_INTERVAL);
|
DFS_REPLICATION_INTERVAL);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
|
||||||
DATANODE_COUNT).build();
|
DATANODE_COUNT).build();
|
||||||
@ -471,4 +476,81 @@ public void testPendingAndInvalidate() throws Exception {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicationCounter() throws Exception {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
|
||||||
|
conf.setInt(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 2);
|
||||||
|
MiniDFSCluster tmpCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
||||||
|
DATANODE_COUNT).build();
|
||||||
|
tmpCluster.waitActive();
|
||||||
|
FSNamesystem fsn = tmpCluster.getNamesystem(0);
|
||||||
|
fsn.writeLock();
|
||||||
|
|
||||||
|
try {
|
||||||
|
BlockManager bm = fsn.getBlockManager();
|
||||||
|
BlocksMap blocksMap = bm.blocksMap;
|
||||||
|
|
||||||
|
// create three blockInfo below, blockInfo0 will success, blockInfo1 will
|
||||||
|
// time out, blockInfo2 will fail the replication.
|
||||||
|
BlockCollection bc0 = Mockito.mock(BlockCollection.class);
|
||||||
|
BlockInfo blockInfo0 = new BlockInfoContiguous((short) 3);
|
||||||
|
blockInfo0.setBlockId(0);
|
||||||
|
|
||||||
|
BlockCollection bc1 = Mockito.mock(BlockCollection.class);
|
||||||
|
BlockInfo blockInfo1 = new BlockInfoContiguous((short) 3);
|
||||||
|
blockInfo1.setBlockId(1);
|
||||||
|
|
||||||
|
BlockCollection bc2 = Mockito.mock(BlockCollection.class);
|
||||||
|
Mockito.when(bc2.getId()).thenReturn((2L));
|
||||||
|
BlockInfo blockInfo2 = new BlockInfoContiguous((short) 3);
|
||||||
|
blockInfo2.setBlockId(2);
|
||||||
|
|
||||||
|
blocksMap.addBlockCollection(blockInfo0, bc0);
|
||||||
|
blocksMap.addBlockCollection(blockInfo1, bc1);
|
||||||
|
blocksMap.addBlockCollection(blockInfo2, bc2);
|
||||||
|
|
||||||
|
PendingReconstructionBlocks pending = bm.pendingReconstruction;
|
||||||
|
|
||||||
|
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
|
||||||
|
assertCounter("SuccessfulReReplications", 0L, rb);
|
||||||
|
assertCounter("NumTimesReReplicationNotScheduled", 0L, rb);
|
||||||
|
assertCounter("TimeoutReReplications", 0L, rb);
|
||||||
|
|
||||||
|
// add block0 and block1 to pending queue.
|
||||||
|
pending.increment(blockInfo0);
|
||||||
|
pending.increment(blockInfo1);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
rb = getMetrics("NameNodeActivity");
|
||||||
|
assertCounter("SuccessfulReReplications", 0L, rb);
|
||||||
|
assertCounter("NumTimesReReplicationNotScheduled", 0L, rb);
|
||||||
|
assertCounter("TimeoutReReplications", 0L, rb);
|
||||||
|
|
||||||
|
// call addBlock on block0 will make it successfully replicated.
|
||||||
|
// not callign addBlock on block1 will make it timeout later.
|
||||||
|
DatanodeStorageInfo[] storageInfos =
|
||||||
|
DFSTestUtil.createDatanodeStorageInfos(1);
|
||||||
|
bm.addBlock(storageInfos[0], blockInfo0, null);
|
||||||
|
|
||||||
|
// call schedule replication on blockInfo2 will fail the re-replication.
|
||||||
|
// because there is no source data to replicate from.
|
||||||
|
bm.scheduleReconstruction(blockInfo2, 0);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
rb = getMetrics("NameNodeActivity");
|
||||||
|
assertCounter("SuccessfulReReplications", 1L, rb);
|
||||||
|
assertCounter("NumTimesReReplicationNotScheduled", 1L, rb);
|
||||||
|
assertCounter("TimeoutReReplications", 1L, rb);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
tmpCluster.shutdown();
|
||||||
|
fsn.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user