HDFS-13468. Add erasure coding metrics into ReadStatistics. (Contributed by Lei (Eddy) Xu)
This commit is contained in:
parent
7ac0abdc13
commit
a8e428b2dc
@ -19,6 +19,7 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.ReadOption;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
@ -92,6 +93,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
LocatedBlocks locatedBlocks) throws IOException {
|
||||
super(dfsClient, src, verifyChecksum, locatedBlocks);
|
||||
|
||||
this.readStatistics.setBlockType(BlockType.STRIPED);
|
||||
assert ecPolicy != null;
|
||||
this.ecPolicy = ecPolicy;
|
||||
this.cellSize = ecPolicy.getCellSize();
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
|
||||
/**
|
||||
* A utility class that maintains statistics for reading.
|
||||
*/
|
||||
@ -26,6 +28,9 @@ public class ReadStatistics {
|
||||
private long totalShortCircuitBytesRead;
|
||||
private long totalZeroCopyBytesRead;
|
||||
|
||||
private BlockType blockType = BlockType.CONTIGUOUS;
|
||||
private long totalEcDecodingTimeMillis;
|
||||
|
||||
public ReadStatistics() {
|
||||
clear();
|
||||
}
|
||||
@ -75,6 +80,21 @@ public synchronized long getRemoteBytesRead() {
|
||||
return totalBytesRead - totalLocalBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return block type of the input stream. If block type != CONTIGUOUS,
|
||||
* it is reading erasure coded data.
|
||||
*/
|
||||
public synchronized BlockType getBlockType() {
|
||||
return blockType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the total time in milliseconds used for erasure coding decoding.
|
||||
*/
|
||||
public synchronized long getTotalEcDecodingTimeMillis() {
|
||||
return totalEcDecodingTimeMillis;
|
||||
}
|
||||
|
||||
public synchronized void addRemoteBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
}
|
||||
@ -97,10 +117,19 @@ public synchronized void addZeroCopyBytes(long amt) {
|
||||
this.totalZeroCopyBytesRead += amt;
|
||||
}
|
||||
|
||||
public synchronized void addErasureCodingDecodingTime(long millis) {
|
||||
this.totalEcDecodingTimeMillis += millis;
|
||||
}
|
||||
|
||||
synchronized void setBlockType(BlockType blockType) {
|
||||
this.blockType = blockType;
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
this.totalBytesRead = 0;
|
||||
this.totalLocalBytesRead = 0;
|
||||
this.totalShortCircuitBytesRead = 0;
|
||||
this.totalZeroCopyBytesRead = 0;
|
||||
this.totalEcDecodingTimeMillis = 0;
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
@ -419,6 +420,8 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException {
|
||||
outputs[i] = decodeInputs[decodeIndices[i]];
|
||||
decodeInputs[decodeIndices[i]] = null;
|
||||
}
|
||||
|
||||
long start = Time.monotonicNow();
|
||||
// Step 2: decode into prepared output buffers
|
||||
decoder.decode(decodeInputs, decodeIndices, outputs);
|
||||
|
||||
@ -432,6 +435,11 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
long end = Time.monotonicNow();
|
||||
// Decoding time includes CPU time on erasure coding and memory copying of
|
||||
// decoded data.
|
||||
dfsStripedInputStream.readStatistics.addErasureCodingDecodingTime(
|
||||
end - start);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -36,11 +36,16 @@
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.ReadStatistics;
|
||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
||||
@ -783,4 +788,59 @@ private void testStatistics(boolean isShortCircuit) throws Exception {
|
||||
if (sockDir != null) sockDir.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testStatisticsForErasureCodingRead() throws IOException {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
|
||||
final ErasureCodingPolicy ecPolicy =
|
||||
StripedFileTestUtil.getDefaultECPolicy();
|
||||
final int numDataNodes =
|
||||
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
|
||||
// The length of test file is one full strip + one partial stripe. And
|
||||
// it is not bound to the stripe cell size.
|
||||
final int length = ecPolicy.getCellSize() * (numDataNodes + 1) + 123;
|
||||
final long randomSeed = 4567L;
|
||||
final short repl = 1;
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(numDataNodes).build()) {
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
fs.enableErasureCodingPolicy(ecPolicy.getName());
|
||||
|
||||
Path ecDir = new Path("/ec");
|
||||
fs.mkdirs(ecDir);
|
||||
fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
|
||||
Path nonEcDir = new Path("/noEc");
|
||||
fs.mkdirs(nonEcDir);
|
||||
|
||||
byte[] buf = new byte[length];
|
||||
|
||||
Path nonEcFile = new Path(nonEcDir, "file1");
|
||||
DFSTestUtil.createFile(fs, nonEcFile, length, repl, randomSeed);
|
||||
try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(nonEcFile)) {
|
||||
IOUtils.readFully(in, buf, 0, length);
|
||||
|
||||
ReadStatistics stats = in.getReadStatistics();
|
||||
Assert.assertEquals(BlockType.CONTIGUOUS, stats.getBlockType());
|
||||
Assert.assertEquals(length, stats.getTotalBytesRead());
|
||||
Assert.assertEquals(length, stats.getTotalLocalBytesRead());
|
||||
}
|
||||
|
||||
Path ecFile = new Path(ecDir, "file2");
|
||||
DFSTestUtil.createFile(fs, ecFile, length, repl, randomSeed);
|
||||
// Shutdown one DataNode so that erasure coding decoding process can kick
|
||||
// in.
|
||||
cluster.shutdownDataNode(0);
|
||||
try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(ecFile)) {
|
||||
IOUtils.readFully(in, buf, 0, length);
|
||||
|
||||
ReadStatistics stats = in.getReadStatistics();
|
||||
Assert.assertEquals(BlockType.STRIPED, stats.getBlockType());
|
||||
Assert.assertEquals(length, stats.getTotalLocalBytesRead());
|
||||
Assert.assertEquals(length, stats.getTotalBytesRead());
|
||||
Assert.assertTrue(stats.getTotalEcDecodingTimeMillis() > 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user