From 2b60d0c1f440e61b57085abd2d72a30db7c013cf Mon Sep 17 00:00:00 2001 From: Melissa You <31492618+melissayou@users.noreply.github.com> Date: Thu, 13 Apr 2023 09:07:42 -0700 Subject: [PATCH] [HDFS-16971] Add read metrics for remote reads in FileSystem Statistics #5534 (#5536) --- .../java/org/apache/hadoop/fs/FileSystem.java | 34 +++++++++++++++++++ .../fs/FileSystemStorageStatistics.java | 5 ++- .../fs/TestFileSystemStorageStatistics.java | 6 +++- .../org/apache/hadoop/hdfs/DFSClient.java | 10 ++++-- .../apache/hadoop/hdfs/DFSInputStream.java | 9 +++-- .../hadoop/hdfs/DFSStripedInputStream.java | 6 ++-- .../org/apache/hadoop/hdfs/StripeReader.java | 5 ++- 7 files changed, 64 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 763af197a1..5d8f0e575f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -3942,6 +3942,7 @@ public static class StatisticsData { private volatile long bytesReadDistanceOfThreeOrFour; private volatile long bytesReadDistanceOfFiveOrLarger; private volatile long bytesReadErasureCoded; + private volatile long remoteReadTimeMS; /** * Add another StatisticsData object to this one. @@ -3959,6 +3960,7 @@ void add(StatisticsData other) { this.bytesReadDistanceOfFiveOrLarger += other.bytesReadDistanceOfFiveOrLarger; this.bytesReadErasureCoded += other.bytesReadErasureCoded; + this.remoteReadTimeMS += other.remoteReadTimeMS; } /** @@ -3977,6 +3979,7 @@ void negate() { this.bytesReadDistanceOfFiveOrLarger = -this.bytesReadDistanceOfFiveOrLarger; this.bytesReadErasureCoded = -this.bytesReadErasureCoded; + this.remoteReadTimeMS = -this.remoteReadTimeMS; } @Override @@ -4025,6 +4028,10 @@ public long getBytesReadDistanceOfFiveOrLarger() { public long getBytesReadErasureCoded() { return bytesReadErasureCoded; } + + public long getRemoteReadTimeMS() { + return remoteReadTimeMS; + } } private interface StatisticsAggregator { @@ -4252,6 +4259,14 @@ public void incrementBytesReadByDistance(int distance, long newBytes) { } } + /** + * Increment the time taken to read bytes from remote in the statistics. + * @param durationMS time taken in ms to read bytes from remote + */ + public void increaseRemoteReadTime(final long durationMS) { + getThreadStatistics().remoteReadTimeMS += durationMS; + } + /** * Apply the given aggregator to all StatisticsData objects associated with * this Statistics object. @@ -4399,6 +4414,25 @@ public long getBytesReadByDistance(int distance) { return bytesRead; } + /** + * Get total time taken in ms for bytes read from remote. + * @return time taken in ms for remote bytes read. + */ + public long getRemoteReadTime() { + return visitAll(new StatisticsAggregator() { + private long remoteReadTimeMS = 0; + + @Override + public void accept(StatisticsData data) { + remoteReadTimeMS += data.remoteReadTimeMS; + } + + public Long aggregate() { + return remoteReadTimeMS; + } + }); + } + /** * Get all statistics data. * MR or other frameworks can use the method to get all statistics at once. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java index 62806d61b5..9e62e63775 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java @@ -47,7 +47,8 @@ public class FileSystemStorageStatistics extends StorageStatistics { "bytesReadDistanceOfOneOrTwo", "bytesReadDistanceOfThreeOrFour", "bytesReadDistanceOfFiveOrLarger", - "bytesReadErasureCoded" + "bytesReadErasureCoded", + "remoteReadTimeMS" }; private static class LongStatisticIterator @@ -107,6 +108,8 @@ private static Long fetch(StatisticsData data, String key) { return data.getBytesReadDistanceOfFiveOrLarger(); case "bytesReadErasureCoded": return data.getBytesReadErasureCoded(); + case "remoteReadTimeMS": + return data.getRemoteReadTimeMS(); default: return null; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java index 2b4e686e59..e99f0f2348 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java @@ -52,7 +52,8 @@ public class TestFileSystemStorageStatistics { "bytesReadDistanceOfOneOrTwo", "bytesReadDistanceOfThreeOrFour", "bytesReadDistanceOfFiveOrLarger", - "bytesReadErasureCoded" + "bytesReadErasureCoded", + "remoteReadTimeMS" }; private FileSystem.Statistics statistics = @@ -74,6 +75,7 @@ public void setup() { statistics.incrementBytesReadByDistance(1, RandomUtils.nextInt(0, 100)); statistics.incrementBytesReadByDistance(3, RandomUtils.nextInt(0, 100)); statistics.incrementBytesReadErasureCoded(RandomUtils.nextInt(0, 100)); + statistics.increaseRemoteReadTime(RandomUtils.nextInt(0, 100)); } @Test @@ -128,6 +130,8 @@ private long getStatisticsValue(String name) { return statistics.getBytesReadByDistance(5); case "bytesReadErasureCoded": return statistics.getBytesReadErasureCoded(); + case "remoteReadTimeMS": + return statistics.getRemoteReadTime(); default: return 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index acfca6799f..8faeebe8e8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3090,10 +3090,14 @@ public Peer newConnectedPeer(InetSocketAddress addr, } } - void updateFileSystemReadStats(int distance, int nRead) { + void updateFileSystemReadStats(int distance, int readBytes, long readTimeMS) { if (stats != null) { - stats.incrementBytesRead(nRead); - stats.incrementBytesReadByDistance(distance, nRead); + stats.incrementBytesRead(readBytes); + stats.incrementBytesReadByDistance(distance, readBytes); + if (distance > 0) { + //remote read + stats.increaseRemoteReadTime(readTimeMS); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index a8d8001607..b5be33206e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -851,8 +851,9 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) locatedBlocks.getFileLength() - pos); } } + long beginReadMS = Time.monotonicNow(); int result = readBuffer(strategy, realLen, corruptedBlocks); - + long readTimeMS = Time.monotonicNow() - beginReadMS; if (result >= 0) { pos += result; } else { @@ -861,7 +862,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) } updateReadStatistics(readStatistics, result, blockReader); dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - result); + result, readTimeMS); if (readStatistics.getBlockType() == BlockType.STRIPED) { dfsClient.updateFileSystemECReadStats(result); } @@ -1184,6 +1185,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, ByteBuffer tmp = buf.duplicate(); tmp.limit(tmp.position() + len); tmp = tmp.slice(); + long beginReadMS = Time.monotonicNow(); int nread = 0; int ret; while (true) { @@ -1193,11 +1195,12 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, } nread += ret; } + long readTimeMS = Time.monotonicNow() - beginReadMS; buf.position(buf.position() + nread); IOUtilsClient.updateReadStatistics(readStatistics, nread, reader); dfsClient.updateFileSystemReadStats( - reader.getNetworkDistance(), nread); + reader.getNetworkDistance(), nread, readTimeMS); if (readStatistics.getBlockType() == BlockType.STRIPED) { dfsClient.updateFileSystemECReadStats(nread); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 5ae5170959..6c1bafbef9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -331,15 +331,17 @@ private void readOneStripe(CorruptedBlocks corruptedBlocks) * its ThreadLocal. * * @param stats striped read stats + * @param readTimeMS read time metrics in ms + * */ - void updateReadStats(final StripedBlockUtil.BlockReadStats stats) { + void updateReadStats(final StripedBlockUtil.BlockReadStats stats, long readTimeMS) { if (stats == null) { return; } updateReadStatistics(readStatistics, stats.getBytesRead(), stats.isShortCircuit(), stats.getNetworkDistance()); dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(), - stats.getBytesRead()); + stats.getBytesRead(), readTimeMS); assert readStatistics.getBlockType() == BlockType.STRIPED; dfsClient.updateFileSystemECReadStats(stats.getBytesRead()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 3fc87c7952..f2d6732a45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -351,9 +351,12 @@ void readStripe() throws IOException { // first read failure while (!futures.isEmpty()) { try { + long beginReadMS = Time.monotonicNow(); StripingChunkReadResult r = StripedBlockUtil .getNextCompletedStripedRead(service, futures, 0); - dfsStripedInputStream.updateReadStats(r.getReadStats()); + long readTimeMS = Time.monotonicNow() - beginReadMS; + + dfsStripedInputStream.updateReadStats(r.getReadStats(), readTimeMS); DFSClient.LOG.debug("Read task returned: {}, for stripe {}", r, alignedStripe); StripingChunk returnedChunk = alignedStripe.chunks[r.index];