HADOOP-15507. Add MapReduce counters about EC bytes read.
This commit is contained in:
parent
8d31ddcfeb
commit
6d5e87aec2
@ -3605,6 +3605,7 @@ public static class StatisticsData {
|
||||
private volatile long bytesReadDistanceOfOneOrTwo;
|
||||
private volatile long bytesReadDistanceOfThreeOrFour;
|
||||
private volatile long bytesReadDistanceOfFiveOrLarger;
|
||||
private volatile long bytesReadErasureCoded;
|
||||
|
||||
/**
|
||||
* Add another StatisticsData object to this one.
|
||||
@ -3621,6 +3622,7 @@ void add(StatisticsData other) {
|
||||
other.bytesReadDistanceOfThreeOrFour;
|
||||
this.bytesReadDistanceOfFiveOrLarger +=
|
||||
other.bytesReadDistanceOfFiveOrLarger;
|
||||
this.bytesReadErasureCoded += other.bytesReadErasureCoded;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3638,6 +3640,7 @@ void negate() {
|
||||
-this.bytesReadDistanceOfThreeOrFour;
|
||||
this.bytesReadDistanceOfFiveOrLarger =
|
||||
-this.bytesReadDistanceOfFiveOrLarger;
|
||||
this.bytesReadErasureCoded = -this.bytesReadErasureCoded;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -3682,6 +3685,10 @@ public long getBytesReadDistanceOfThreeOrFour() {
|
||||
public long getBytesReadDistanceOfFiveOrLarger() {
|
||||
return bytesReadDistanceOfFiveOrLarger;
|
||||
}
|
||||
|
||||
public long getBytesReadErasureCoded() {
|
||||
return bytesReadErasureCoded;
|
||||
}
|
||||
}
|
||||
|
||||
private interface StatisticsAggregator<T> {
|
||||
@ -3873,6 +3880,14 @@ public void incrementWriteOps(int count) {
|
||||
getThreadStatistics().writeOps += count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the bytes read on erasure-coded files in the statistics.
|
||||
* @param newBytes the additional bytes read
|
||||
*/
|
||||
public void incrementBytesReadErasureCoded(long newBytes) {
|
||||
getThreadStatistics().bytesReadErasureCoded += newBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the bytes read by the network distance in the statistics
|
||||
* In the common network topology setup, distance value should be an even
|
||||
@ -4067,6 +4082,25 @@ public StatisticsData aggregate() {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total number of bytes read on erasure-coded files.
|
||||
* @return the number of bytes
|
||||
*/
|
||||
public long getBytesReadErasureCoded() {
|
||||
return visitAll(new StatisticsAggregator<Long>() {
|
||||
private long bytesReadErasureCoded = 0;
|
||||
|
||||
@Override
|
||||
public void accept(StatisticsData data) {
|
||||
bytesReadErasureCoded += data.bytesReadErasureCoded;
|
||||
}
|
||||
|
||||
public Long aggregate() {
|
||||
return bytesReadErasureCoded;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return visitAll(new StatisticsAggregator<String>() {
|
||||
|
@ -46,7 +46,8 @@ public class FileSystemStorageStatistics extends StorageStatistics {
|
||||
"bytesReadLocalHost",
|
||||
"bytesReadDistanceOfOneOrTwo",
|
||||
"bytesReadDistanceOfThreeOrFour",
|
||||
"bytesReadDistanceOfFiveOrLarger"
|
||||
"bytesReadDistanceOfFiveOrLarger",
|
||||
"bytesReadErasureCoded"
|
||||
};
|
||||
|
||||
private static class LongStatisticIterator
|
||||
@ -104,6 +105,8 @@ private static Long fetch(StatisticsData data, String key) {
|
||||
return data.getBytesReadDistanceOfThreeOrFour();
|
||||
case "bytesReadDistanceOfFiveOrLarger":
|
||||
return data.getBytesReadDistanceOfFiveOrLarger();
|
||||
case "bytesReadErasureCoded":
|
||||
return data.getBytesReadErasureCoded();
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
@ -51,7 +51,8 @@ public class TestFileSystemStorageStatistics {
|
||||
"bytesReadLocalHost",
|
||||
"bytesReadDistanceOfOneOrTwo",
|
||||
"bytesReadDistanceOfThreeOrFour",
|
||||
"bytesReadDistanceOfFiveOrLarger"
|
||||
"bytesReadDistanceOfFiveOrLarger",
|
||||
"bytesReadErasureCoded"
|
||||
};
|
||||
|
||||
private FileSystem.Statistics statistics =
|
||||
@ -74,6 +75,7 @@ public void setup() {
|
||||
statistics.incrementBytesReadByDistance(0, RandomUtils.nextInt(100));
|
||||
statistics.incrementBytesReadByDistance(1, RandomUtils.nextInt(100));
|
||||
statistics.incrementBytesReadByDistance(3, RandomUtils.nextInt(100));
|
||||
statistics.incrementBytesReadErasureCoded(RandomUtils.nextInt(100));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -126,6 +128,8 @@ private long getStatisticsValue(String name) {
|
||||
return statistics.getBytesReadByDistance(3);
|
||||
case "bytesReadDistanceOfFiveOrLarger":
|
||||
return statistics.getBytesReadByDistance(5);
|
||||
case "bytesReadErasureCoded":
|
||||
return statistics.getBytesReadErasureCoded();
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
@ -2942,6 +2942,12 @@ void updateFileSystemReadStats(int distance, int nRead) {
|
||||
}
|
||||
}
|
||||
|
||||
void updateFileSystemECReadStats(int nRead) {
|
||||
if (stats != null) {
|
||||
stats.incrementBytesReadErasureCoded(nRead);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
|
||||
* it does not already exist.
|
||||
|
@ -61,6 +61,7 @@
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -1082,6 +1083,9 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
|
||||
IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
|
||||
dfsClient.updateFileSystemReadStats(
|
||||
reader.getNetworkDistance(), nread);
|
||||
if (readStatistics.getBlockType() == BlockType.STRIPED) {
|
||||
dfsClient.updateFileSystemECReadStats(nread);
|
||||
}
|
||||
if (nread != len) {
|
||||
throw new IOException("truncated return from reader.read(): " +
|
||||
"excpected " + len + ", got " + nread);
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
|
||||
@ -121,6 +123,9 @@ public int readFromBlock(BlockReader blockReader,
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
if (readStatistics.getBlockType() == BlockType.STRIPED) {
|
||||
dfsClient.updateFileSystemECReadStats(nRead);
|
||||
}
|
||||
offset += nRead;
|
||||
}
|
||||
return nRead;
|
||||
@ -188,6 +193,9 @@ public int readFromBlock(BlockReader blockReader,
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
if (readStatistics.getBlockType() == BlockType.STRIPED) {
|
||||
dfsClient.updateFileSystemECReadStats(nRead);
|
||||
}
|
||||
}
|
||||
|
||||
return nRead;
|
||||
|
@ -86,6 +86,7 @@ abstract public class Task implements Writable, Configurable {
|
||||
|
||||
public static String MERGED_OUTPUT_PREFIX = ".merged";
|
||||
public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
|
||||
private static final String HDFS_URI_SCHEME = "hdfs";
|
||||
|
||||
/**
|
||||
* @deprecated Provided for compatibility. Use {@link TaskCounter} instead.
|
||||
@ -1125,7 +1126,8 @@ public void incrementGcCounter() {
|
||||
class FileSystemStatisticUpdater {
|
||||
private List<FileSystem.Statistics> stats;
|
||||
private Counters.Counter readBytesCounter, writeBytesCounter,
|
||||
readOpsCounter, largeReadOpsCounter, writeOpsCounter;
|
||||
readOpsCounter, largeReadOpsCounter, writeOpsCounter,
|
||||
readBytesEcCounter;
|
||||
private String scheme;
|
||||
FileSystemStatisticUpdater(List<FileSystem.Statistics> stats, String scheme) {
|
||||
this.stats = stats;
|
||||
@ -1153,23 +1155,33 @@ void updateCounters() {
|
||||
writeOpsCounter = counters.findCounter(scheme,
|
||||
FileSystemCounter.WRITE_OPS);
|
||||
}
|
||||
if (readBytesEcCounter == null && scheme.equals(HDFS_URI_SCHEME)) {
|
||||
// EC bytes only applies to hdfs
|
||||
readBytesEcCounter =
|
||||
counters.findCounter(scheme, FileSystemCounter.BYTES_READ_EC);
|
||||
}
|
||||
long readBytes = 0;
|
||||
long writeBytes = 0;
|
||||
long readOps = 0;
|
||||
long largeReadOps = 0;
|
||||
long writeOps = 0;
|
||||
long readBytesEC = 0;
|
||||
for (FileSystem.Statistics stat: stats) {
|
||||
readBytes = readBytes + stat.getBytesRead();
|
||||
writeBytes = writeBytes + stat.getBytesWritten();
|
||||
readOps = readOps + stat.getReadOps();
|
||||
largeReadOps = largeReadOps + stat.getLargeReadOps();
|
||||
writeOps = writeOps + stat.getWriteOps();
|
||||
readBytesEC = readBytesEC + stat.getBytesReadErasureCoded();
|
||||
}
|
||||
readBytesCounter.setValue(readBytes);
|
||||
writeBytesCounter.setValue(writeBytes);
|
||||
readOpsCounter.setValue(readOps);
|
||||
largeReadOpsCounter.setValue(largeReadOps);
|
||||
writeOpsCounter.setValue(writeOps);
|
||||
if (readBytesEcCounter != null) {
|
||||
readBytesEcCounter.setValue(readBytesEC);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,4 +27,5 @@ public enum FileSystemCounter {
|
||||
READ_OPS,
|
||||
LARGE_READ_OPS,
|
||||
WRITE_OPS,
|
||||
BYTES_READ_EC,
|
||||
}
|
||||
|
@ -19,3 +19,4 @@ BYTES_WRITTEN.name= Number of bytes written
|
||||
READ_OPS.name= Number of read operations
|
||||
LARGE_READ_OPS.name= Number of large read operations
|
||||
WRITE_OPS.name= Number of write operations
|
||||
BYTES_READ_EC.name= Number of bytes read erasure-coded
|
||||
|
Loading…
Reference in New Issue
Block a user