This commit is contained in:
parent
06f9bdffa6
commit
2b60d0c1f4
@ -3942,6 +3942,7 @@ public abstract class FileSystem extends Configured
|
||||
private volatile long bytesReadDistanceOfThreeOrFour;
|
||||
private volatile long bytesReadDistanceOfFiveOrLarger;
|
||||
private volatile long bytesReadErasureCoded;
|
||||
private volatile long remoteReadTimeMS;
|
||||
|
||||
/**
|
||||
* Add another StatisticsData object to this one.
|
||||
@ -3959,6 +3960,7 @@ public abstract class FileSystem extends Configured
|
||||
this.bytesReadDistanceOfFiveOrLarger +=
|
||||
other.bytesReadDistanceOfFiveOrLarger;
|
||||
this.bytesReadErasureCoded += other.bytesReadErasureCoded;
|
||||
this.remoteReadTimeMS += other.remoteReadTimeMS;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3977,6 +3979,7 @@ public abstract class FileSystem extends Configured
|
||||
this.bytesReadDistanceOfFiveOrLarger =
|
||||
-this.bytesReadDistanceOfFiveOrLarger;
|
||||
this.bytesReadErasureCoded = -this.bytesReadErasureCoded;
|
||||
this.remoteReadTimeMS = -this.remoteReadTimeMS;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -4025,6 +4028,10 @@ public abstract class FileSystem extends Configured
|
||||
public long getBytesReadErasureCoded() {
|
||||
return bytesReadErasureCoded;
|
||||
}
|
||||
|
||||
public long getRemoteReadTimeMS() {
|
||||
return remoteReadTimeMS;
|
||||
}
|
||||
}
|
||||
|
||||
private interface StatisticsAggregator<T> {
|
||||
@ -4252,6 +4259,14 @@ public abstract class FileSystem extends Configured
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the time taken to read bytes from remote in the statistics.
|
||||
* @param durationMS time taken in ms to read bytes from remote
|
||||
*/
|
||||
public void increaseRemoteReadTime(final long durationMS) {
|
||||
getThreadStatistics().remoteReadTimeMS += durationMS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the given aggregator to all StatisticsData objects associated with
|
||||
* this Statistics object.
|
||||
@ -4399,6 +4414,25 @@ public abstract class FileSystem extends Configured
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total time taken in ms for bytes read from remote.
|
||||
* @return time taken in ms for remote bytes read.
|
||||
*/
|
||||
public long getRemoteReadTime() {
|
||||
return visitAll(new StatisticsAggregator<Long>() {
|
||||
private long remoteReadTimeMS = 0;
|
||||
|
||||
@Override
|
||||
public void accept(StatisticsData data) {
|
||||
remoteReadTimeMS += data.remoteReadTimeMS;
|
||||
}
|
||||
|
||||
public Long aggregate() {
|
||||
return remoteReadTimeMS;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all statistics data.
|
||||
* MR or other frameworks can use the method to get all statistics at once.
|
||||
|
@ -47,7 +47,8 @@ public class FileSystemStorageStatistics extends StorageStatistics {
|
||||
"bytesReadDistanceOfOneOrTwo",
|
||||
"bytesReadDistanceOfThreeOrFour",
|
||||
"bytesReadDistanceOfFiveOrLarger",
|
||||
"bytesReadErasureCoded"
|
||||
"bytesReadErasureCoded",
|
||||
"remoteReadTimeMS"
|
||||
};
|
||||
|
||||
private static class LongStatisticIterator
|
||||
@ -107,6 +108,8 @@ public class FileSystemStorageStatistics extends StorageStatistics {
|
||||
return data.getBytesReadDistanceOfFiveOrLarger();
|
||||
case "bytesReadErasureCoded":
|
||||
return data.getBytesReadErasureCoded();
|
||||
case "remoteReadTimeMS":
|
||||
return data.getRemoteReadTimeMS();
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
@ -52,7 +52,8 @@ public class TestFileSystemStorageStatistics {
|
||||
"bytesReadDistanceOfOneOrTwo",
|
||||
"bytesReadDistanceOfThreeOrFour",
|
||||
"bytesReadDistanceOfFiveOrLarger",
|
||||
"bytesReadErasureCoded"
|
||||
"bytesReadErasureCoded",
|
||||
"remoteReadTimeMS"
|
||||
};
|
||||
|
||||
private FileSystem.Statistics statistics =
|
||||
@ -74,6 +75,7 @@ public class TestFileSystemStorageStatistics {
|
||||
statistics.incrementBytesReadByDistance(1, RandomUtils.nextInt(0, 100));
|
||||
statistics.incrementBytesReadByDistance(3, RandomUtils.nextInt(0, 100));
|
||||
statistics.incrementBytesReadErasureCoded(RandomUtils.nextInt(0, 100));
|
||||
statistics.increaseRemoteReadTime(RandomUtils.nextInt(0, 100));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -128,6 +130,8 @@ public class TestFileSystemStorageStatistics {
|
||||
return statistics.getBytesReadByDistance(5);
|
||||
case "bytesReadErasureCoded":
|
||||
return statistics.getBytesReadErasureCoded();
|
||||
case "remoteReadTimeMS":
|
||||
return statistics.getRemoteReadTime();
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
@ -3090,10 +3090,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||
}
|
||||
}
|
||||
|
||||
void updateFileSystemReadStats(int distance, int nRead) {
|
||||
void updateFileSystemReadStats(int distance, int readBytes, long readTimeMS) {
|
||||
if (stats != null) {
|
||||
stats.incrementBytesRead(nRead);
|
||||
stats.incrementBytesReadByDistance(distance, nRead);
|
||||
stats.incrementBytesRead(readBytes);
|
||||
stats.incrementBytesReadByDistance(distance, readBytes);
|
||||
if (distance > 0) {
|
||||
//remote read
|
||||
stats.increaseRemoteReadTime(readTimeMS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -851,8 +851,9 @@ public class DFSInputStream extends FSInputStream
|
||||
locatedBlocks.getFileLength() - pos);
|
||||
}
|
||||
}
|
||||
long beginReadMS = Time.monotonicNow();
|
||||
int result = readBuffer(strategy, realLen, corruptedBlocks);
|
||||
|
||||
long readTimeMS = Time.monotonicNow() - beginReadMS;
|
||||
if (result >= 0) {
|
||||
pos += result;
|
||||
} else {
|
||||
@ -861,7 +862,7 @@ public class DFSInputStream extends FSInputStream
|
||||
}
|
||||
updateReadStatistics(readStatistics, result, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
result);
|
||||
result, readTimeMS);
|
||||
if (readStatistics.getBlockType() == BlockType.STRIPED) {
|
||||
dfsClient.updateFileSystemECReadStats(result);
|
||||
}
|
||||
@ -1184,6 +1185,7 @@ public class DFSInputStream extends FSInputStream
|
||||
ByteBuffer tmp = buf.duplicate();
|
||||
tmp.limit(tmp.position() + len);
|
||||
tmp = tmp.slice();
|
||||
long beginReadMS = Time.monotonicNow();
|
||||
int nread = 0;
|
||||
int ret;
|
||||
while (true) {
|
||||
@ -1193,11 +1195,12 @@ public class DFSInputStream extends FSInputStream
|
||||
}
|
||||
nread += ret;
|
||||
}
|
||||
long readTimeMS = Time.monotonicNow() - beginReadMS;
|
||||
buf.position(buf.position() + nread);
|
||||
|
||||
IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
|
||||
dfsClient.updateFileSystemReadStats(
|
||||
reader.getNetworkDistance(), nread);
|
||||
reader.getNetworkDistance(), nread, readTimeMS);
|
||||
if (readStatistics.getBlockType() == BlockType.STRIPED) {
|
||||
dfsClient.updateFileSystemECReadStats(nread);
|
||||
}
|
||||
|
@ -331,15 +331,17 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
* its ThreadLocal.
|
||||
*
|
||||
* @param stats striped read stats
|
||||
* @param readTimeMS read time metrics in ms
|
||||
*
|
||||
*/
|
||||
void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
|
||||
void updateReadStats(final StripedBlockUtil.BlockReadStats stats, long readTimeMS) {
|
||||
if (stats == null) {
|
||||
return;
|
||||
}
|
||||
updateReadStatistics(readStatistics, stats.getBytesRead(),
|
||||
stats.isShortCircuit(), stats.getNetworkDistance());
|
||||
dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
|
||||
stats.getBytesRead());
|
||||
stats.getBytesRead(), readTimeMS);
|
||||
assert readStatistics.getBlockType() == BlockType.STRIPED;
|
||||
dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
|
||||
}
|
||||
|
@ -351,9 +351,12 @@ abstract class StripeReader {
|
||||
// first read failure
|
||||
while (!futures.isEmpty()) {
|
||||
try {
|
||||
long beginReadMS = Time.monotonicNow();
|
||||
StripingChunkReadResult r = StripedBlockUtil
|
||||
.getNextCompletedStripedRead(service, futures, 0);
|
||||
dfsStripedInputStream.updateReadStats(r.getReadStats());
|
||||
long readTimeMS = Time.monotonicNow() - beginReadMS;
|
||||
|
||||
dfsStripedInputStream.updateReadStats(r.getReadStats(), readTimeMS);
|
||||
DFSClient.LOG.debug("Read task returned: {}, for stripe {}",
|
||||
r, alignedStripe);
|
||||
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
||||
|
Loading…
x
Reference in New Issue
Block a user