diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index e7d90edf37..339a02c84b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -92,6 +93,7 @@ public class DFSStripedInputStream extends DFSInputStream { LocatedBlocks locatedBlocks) throws IOException { super(dfsClient, src, verifyChecksum, locatedBlocks); + this.readStatistics.setBlockType(BlockType.STRIPED); assert ecPolicy != null; this.ecPolicy = ecPolicy; this.cellSize = ecPolicy.getCellSize(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java index 59b1418d95..af53f0abf7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import org.apache.hadoop.hdfs.protocol.BlockType; + /** * A utility class that maintains statistics for reading. */ @@ -26,6 +28,9 @@ public class ReadStatistics { private long totalShortCircuitBytesRead; private long totalZeroCopyBytesRead; + private BlockType blockType = BlockType.CONTIGUOUS; + private long totalEcDecodingTimeMillis; + public ReadStatistics() { clear(); } @@ -75,6 +80,21 @@ public synchronized long getRemoteBytesRead() { return totalBytesRead - totalLocalBytesRead; } + /** + * @return block type of the input stream. If block type != CONTIGUOUS, + * it is reading erasure coded data. + */ + public synchronized BlockType getBlockType() { + return blockType; + } + + /** + * Return the total time in milliseconds used for erasure coding decoding. + */ + public synchronized long getTotalEcDecodingTimeMillis() { + return totalEcDecodingTimeMillis; + } + public synchronized void addRemoteBytes(long amt) { this.totalBytesRead += amt; } @@ -97,10 +117,19 @@ public synchronized void addZeroCopyBytes(long amt) { this.totalZeroCopyBytesRead += amt; } + public synchronized void addErasureCodingDecodingTime(long millis) { + this.totalEcDecodingTimeMillis += millis; + } + + synchronized void setBlockType(BlockType blockType) { + this.blockType = blockType; + } + public synchronized void clear() { this.totalBytesRead = 0; this.totalLocalBytesRead = 0; this.totalShortCircuitBytesRead = 0; this.totalZeroCopyBytesRead = 0; + this.totalEcDecodingTimeMillis = 0; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 9a204230ab..0554ebed0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.io.erasurecode.ECChunk; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; +import org.apache.hadoop.util.Time; import java.io.IOException; import java.io.InterruptedIOException; @@ -419,6 +420,8 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException { outputs[i] = decodeInputs[decodeIndices[i]]; decodeInputs[decodeIndices[i]] = null; } + + long start = Time.monotonicNow(); // Step 2: decode into prepared output buffers decoder.decode(decodeInputs, decodeIndices, outputs); @@ -432,6 +435,11 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException { } } } + long end = Time.monotonicNow(); + // Decoding time includes CPU time on erasure coding and memory copying of + // decoded data. + dfsStripedInputStream.readStatistics.addErasureCodingDecodingTime( + end - start); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java index bd0c7c32c9..b9bb495696 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -36,11 +36,16 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.ReadStatistics; +import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.BlockType; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; @@ -783,4 +788,59 @@ private void testStatistics(boolean isShortCircuit) throws Exception { if (sockDir != null) sockDir.close(); } } + + @Test(timeout = 60000) + public void testStatisticsForErasureCodingRead() throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + + final ErasureCodingPolicy ecPolicy = + StripedFileTestUtil.getDefaultECPolicy(); + final int numDataNodes = + ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(); + // The length of test file is one full strip + one partial stripe. And + // it is not bound to the stripe cell size. + final int length = ecPolicy.getCellSize() * (numDataNodes + 1) + 123; + final long randomSeed = 4567L; + final short repl = 1; + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDataNodes).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + fs.enableErasureCodingPolicy(ecPolicy.getName()); + + Path ecDir = new Path("/ec"); + fs.mkdirs(ecDir); + fs.setErasureCodingPolicy(ecDir, ecPolicy.getName()); + Path nonEcDir = new Path("/noEc"); + fs.mkdirs(nonEcDir); + + byte[] buf = new byte[length]; + + Path nonEcFile = new Path(nonEcDir, "file1"); + DFSTestUtil.createFile(fs, nonEcFile, length, repl, randomSeed); + try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(nonEcFile)) { + IOUtils.readFully(in, buf, 0, length); + + ReadStatistics stats = in.getReadStatistics(); + Assert.assertEquals(BlockType.CONTIGUOUS, stats.getBlockType()); + Assert.assertEquals(length, stats.getTotalBytesRead()); + Assert.assertEquals(length, stats.getTotalLocalBytesRead()); + } + + Path ecFile = new Path(ecDir, "file2"); + DFSTestUtil.createFile(fs, ecFile, length, repl, randomSeed); + // Shutdown one DataNode so that erasure coding decoding process can kick + // in. + cluster.shutdownDataNode(0); + try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(ecFile)) { + IOUtils.readFully(in, buf, 0, length); + + ReadStatistics stats = in.getReadStatistics(); + Assert.assertEquals(BlockType.STRIPED, stats.getBlockType()); + Assert.assertEquals(length, stats.getTotalLocalBytesRead()); + Assert.assertEquals(length, stats.getTotalBytesRead()); + Assert.assertTrue(stats.getTotalEcDecodingTimeMillis() > 0); + } + } + } }