diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 8210eee038..43a3f331f0 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -453,6 +453,22 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker | | `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker | | `EcReconstructionRemoteBytesRead` | Total number of bytes remote read by erasure coding worker | +| `CreateRbwOpNumOps` | Total number of create rbw operations | +| `CreateRbwOpAvgTime` | Average time of create rbw operations in milliseconds | +| `RecoverRbwOpNumOps` | Total number of recovery rbw operations | +| `RecoverRbwOpAvgTime` | Average time of recovery rbw operations in milliseconds | +| `ConvertTemporaryToRbwOpNumOps` | Total number of convert temporary to rbw operations | +| `ConvertTemporaryToRbwOpAvgTime` | Average time of convert temporary to rbw operations in milliseconds | +| `CreateTemporaryOpNumOps` | Total number of create temporary operations | +| `CreateTemporaryOpAvgTime` | Average time of create temporary operations in milliseconds | +| `FinalizeBlockOpNumOps` | Total number of finalize block operations | +| `FinalizeBlockOpAvgTime` | Average time of finalize block operations in milliseconds | +| `UnfinalizeBlockOpNumOps` | Total number of un-finalize block operations | +| `UnfinalizeBlockOpAvgTime` | Average time of un-finalize block operations in milliseconds | +| `CheckAndUpdateOpNumOps` | Total number of check and update operations | +| `CheckAndUpdateOpAvgTime` | Average time of check and update operations in milliseconds | +| `UpdateReplicaUnderRecoveryOpNumOps` | Total number of update replica under recovery operations | +| `UpdateReplicaUnderRecoveryOpAvgTime` | Average time of update replica under recovery operations in milliseconds | FsVolume -------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 39152fccb8..d6b50e1364 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -243,6 +244,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) } final DataNode datanode; + private final DataNodeMetrics dataNodeMetrics; final DataStorage dataStorage; private final FsVolumeList volumes; final Map storageMap; @@ -284,6 +286,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) ) throws IOException { this.fsRunning = true; this.datanode = datanode; + this.dataNodeMetrics = datanode.getMetrics(); this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); @@ -1425,6 +1428,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, public ReplicaHandler createRbw( StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { + long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); @@ -1485,6 +1489,11 @@ public ReplicaHandler createRbw( volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); return new ReplicaHandler(newReplicaInfo, ref); + } finally { + if (dataNodeMetrics != null) { + long createRbwMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addCreateRbwOp(createRbwMs); + } } } @@ -1493,27 +1502,34 @@ public ReplicaHandler recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { LOG.info("Recover RBW replica " + b); - - while (true) { - try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { - ReplicaInfo replicaInfo = - getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); - // check the replica's state - if (replicaInfo.getState() != ReplicaState.RBW) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); + long startTimeMs = Time.monotonicNow(); + try { + while (true) { + try { + try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + ReplicaInfo replicaInfo = + getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); + // check the replica's state + if (replicaInfo.getState() != ReplicaState.RBW) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); + } + ReplicaInPipeline rbw = (ReplicaInPipeline) replicaInfo; + if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { + throw new MustStopExistingWriter(rbw); + } + LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw); + return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd); } - ReplicaInPipeline rbw = (ReplicaInPipeline)replicaInfo; - if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { - throw new MustStopExistingWriter(rbw); - } - LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw); - return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd); + } catch (MustStopExistingWriter e) { + e.getReplicaInPipeline().stopWriter( + datanode.getDnConf().getXceiverStopTimeout()); } - } catch (MustStopExistingWriter e) { - e.getReplicaInPipeline().stopWriter( - datanode.getDnConf().getXceiverStopTimeout()); + } + } finally { + if (dataNodeMetrics != null) { + long recoverRbwMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addRecoverRbwOp(recoverRbwMs); } } } @@ -1581,7 +1597,7 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, @Override // FsDatasetSpi public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { - + long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); @@ -1637,6 +1653,11 @@ public ReplicaInPipeline convertTemporaryToRbw( // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo()); return rbw; + } finally { + if (dataNodeMetrics != null) { + long convertTemporaryToRbwMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addConvertTemporaryToRbwOp(convertTemporaryToRbwMs); + } } } @@ -1701,6 +1722,7 @@ public ReplicaHandler createTemporary(StorageType storageType, // Stop the previous writer ((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs); } while (true); + long holdLockTimeMs = Time.monotonicNow() - startTimeMs; if (lastFoundReplicaInfo != null && !isReplicaProvided(lastFoundReplicaInfo)) { // Old blockfile should be deleted synchronously as it might collide @@ -1709,6 +1731,7 @@ public ReplicaHandler createTemporary(StorageType storageType, invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }, false); } + long startHoldLockTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b .getNumBytes()); @@ -1723,6 +1746,13 @@ public ReplicaHandler createTemporary(StorageType storageType, volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); return new ReplicaHandler(newReplicaInfo, ref); + } finally { + if (dataNodeMetrics != null) { + // Create temporary operation hold write lock twice. + long createTemporaryOpMs = Time.monotonicNow() - startHoldLockTimeMs + + holdLockTimeMs; + dataNodeMetrics.addCreateTemporaryOp(createTemporaryOpMs); + } } } @@ -1760,6 +1790,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException { ReplicaInfo replicaInfo = null; ReplicaInfo finalizedReplicaInfo = null; + long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads @@ -1772,6 +1803,11 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) return; } finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo); + } finally { + if (dataNodeMetrics != null) { + long finalizeBlockMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addFinalizeBlockOp(finalizeBlockMs); + } } /* * Sync the directory after rename from tmp/rbw to Finalized if @@ -1836,6 +1872,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) */ @Override // FsDatasetSpi public void unfinalizeBlock(ExtendedBlock b) throws IOException { + long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); @@ -1853,6 +1890,11 @@ public void unfinalizeBlock(ExtendedBlock b) throws IOException { b.getBlockId(), true); } } + } finally { + if (dataNodeMetrics != null) { + long unFinalizedBlockMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addUnfinalizeBlockOp(unFinalizedBlockMs); + } } } @@ -2406,6 +2448,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) Block corruptBlock = null; ReplicaInfo memBlockInfo; + long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && @@ -2581,6 +2624,11 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) + memBlockInfo.getBlockDataLength()); memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength()); } + } finally { + if (dataNodeMetrics != null) { + long checkAndUpdateTimeMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addCheckAndUpdateOp(checkAndUpdateTimeMs); + } } // Send corrupt block report outside the lock @@ -2714,6 +2762,7 @@ public Replica updateReplicaUnderRecovery( final long recoveryId, final long newBlockId, final long newlength) throws IOException { + long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = datasetWriteLock.acquire()) { //get replica final String bpid = oldBlock.getBlockPoolId(); @@ -2770,6 +2819,12 @@ public Replica updateReplicaUnderRecovery( checkReplicaFiles(finalized); return finalized; + } finally { + if (dataNodeMetrics != null) { + long updateReplicaUnderRecoveryMs = Time.monotonicNow() - startTimeMs; + dataNodeMetrics.addUpdateReplicaUnderRecoveryOp( + updateReplicaUnderRecoveryMs); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index cc802375f9..00093f7543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -167,6 +167,16 @@ public class DataNodeMetrics { @Metric("Rate of processed commands of all BPServiceActors") private MutableRate processedCommandsOp; + // FsDatasetImpl local file process metrics. + @Metric private MutableRate createRbwOp; + @Metric private MutableRate recoverRbwOp; + @Metric private MutableRate convertTemporaryToRbwOp; + @Metric private MutableRate createTemporaryOp; + @Metric private MutableRate finalizeBlockOp; + @Metric private MutableRate unfinalizeBlockOp; + @Metric private MutableRate checkAndUpdateOp; + @Metric private MutableRate updateReplicaUnderRecoveryOp; + final MetricsRegistry registry = new MetricsRegistry("datanode"); @Metric("Milliseconds spent on calling NN rpc") private MutableRatesWithAggregation @@ -574,4 +584,68 @@ public void incrNumProcessedCommands() { public void addNumProcessedCommands(long latency) { processedCommandsOp.add(latency); } + + /** + * Add addCreateRbwOp metrics. + * @param latency milliseconds of create RBW file + */ + public void addCreateRbwOp(long latency) { + createRbwOp.add(latency); + } + + /** + * Add addRecoverRbwOp metrics. + * @param latency milliseconds of recovery RBW file + */ + public void addRecoverRbwOp(long latency) { + recoverRbwOp.add(latency); + } + + /** + * Add addConvertTemporaryToRbwOp metrics. + * @param latency milliseconds of convert temporary to RBW file + */ + public void addConvertTemporaryToRbwOp(long latency) { + convertTemporaryToRbwOp.add(latency); + } + + /** + * Add addCreateTemporaryOp metrics. + * @param latency milliseconds of create temporary block file + */ + public void addCreateTemporaryOp(long latency) { + createTemporaryOp.add(latency); + } + + /** + * Add addFinalizeBlockOp metrics. + * @param latency milliseconds of finalize block + */ + public void addFinalizeBlockOp(long latency) { + finalizeBlockOp.add(latency); + } + + /** + * Add addUnfinalizeBlockOp metrics. + * @param latency milliseconds of un-finalize block file + */ + public void addUnfinalizeBlockOp(long latency) { + unfinalizeBlockOp.add(latency); + } + + /** + * Add addCheckAndUpdateOp metrics. + * @param latency milliseconds of check and update block file + */ + public void addCheckAndUpdateOp(long latency) { + checkAndUpdateOp.add(latency); + } + + /** + * Add addUpdateReplicaUnderRecoveryOp metrics. + * @param latency milliseconds of update and replica under recovery block file + */ + public void addUpdateReplicaUnderRecoveryOp(long latency) { + updateReplicaUnderRecoveryOp.add(latency); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index b4e26405ca..81d144f05e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -37,6 +37,7 @@ import com.google.common.collect.Lists; import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,6 +160,50 @@ public void testReceivePacketMetrics() throws Exception { } } + /** + * HDFS-15242: This function ensures that writing causes some metrics + * of FSDatasetImpl to increment. + */ + @Test + public void testFsDatasetMetrics() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId(); + List datanodes = cluster.getDataNodes(); + DataNode datanode = datanodes.get(0); + + // Verify both of metrics set to 0 when initialize. + MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name()); + assertCounter("CreateRbwOpNumOps", 0L, rb); + assertCounter("CreateTemporaryOpNumOps", 0L, rb); + assertCounter("FinalizeBlockOpNumOps", 0L, rb); + + // Write into a file to trigger DN metrics. + DistributedFileSystem fs = cluster.getFileSystem(); + Path testFile = new Path("/testBlockMetrics.txt"); + FSDataOutputStream fout = fs.create(testFile); + fout.write(new byte[1]); + fout.hsync(); + fout.close(); + + // Create temporary block file to trigger DN metrics. + final ExtendedBlock block = new ExtendedBlock(bpid, 1, 1, 2001); + datanode.data.createTemporary(StorageType.DEFAULT, null, block, false); + + // Verify both of metrics value has updated after do some operations. + rb = getMetrics(datanode.getMetrics().name()); + assertCounter("CreateRbwOpNumOps", 1L, rb); + assertCounter("CreateTemporaryOpNumOps", 1L, rb); + assertCounter("FinalizeBlockOpNumOps", 1L, rb); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Tests that round-trip acks in a datanode write pipeline are correctly * measured.