HDFS-16917 Add transfer rate quantile metrics for DataNode reads (#5397)
Co-authored-by: Ravindra Dingankar <rdingankar@linkedin.com>
This commit is contained in:
parent
c25ac781ca
commit
94b3c6dd90
@ -368,6 +368,9 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
|||||||
|:---- |:---- |
|
|:---- |:---- |
|
||||||
| `BytesWritten` | Total number of bytes written to DataNode |
|
| `BytesWritten` | Total number of bytes written to DataNode |
|
||||||
| `BytesRead` | Total number of bytes read from DataNode |
|
| `BytesRead` | Total number of bytes read from DataNode |
|
||||||
|
| `ReadTransferRateNumOps` | Total number of data read transfers |
|
||||||
|
| `ReadTransferRateAvgTime` | Average transfer rate of bytes read from DataNode, measured in bytes per second. |
|
||||||
|
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
|
||||||
| `BlocksWritten` | Total number of blocks written to DataNode |
|
| `BlocksWritten` | Total number of blocks written to DataNode |
|
||||||
| `BlocksRead` | Total number of blocks read from DataNode |
|
| `BlocksRead` | Total number of blocks read from DataNode |
|
||||||
| `BlocksReplicated` | Total number of blocks replicated |
|
| `BlocksReplicated` | Total number of blocks replicated |
|
||||||
|
@ -69,6 +69,7 @@
|
|||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
@ -1893,4 +1894,18 @@ public static boolean isParentEntry(final String path, final String parent) {
|
|||||||
return path.charAt(parent.length()) == Path.SEPARATOR_CHAR
|
return path.charAt(parent.length()) == Path.SEPARATOR_CHAR
|
||||||
|| parent.equals(Path.SEPARATOR);
|
|| parent.equals(Path.SEPARATOR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add transfer rate metrics for valid data read and duration values.
|
||||||
|
* @param metrics metrics for datanodes
|
||||||
|
* @param read bytes read
|
||||||
|
* @param duration read duration
|
||||||
|
*/
|
||||||
|
public static void addTransferRateMetric(final DataNodeMetrics metrics, final long read, final long duration) {
|
||||||
|
if (read >= 0 && duration > 0) {
|
||||||
|
metrics.addReadTransferRate(read * 1000 / duration);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.fs.FsTracer;
|
import org.apache.hadoop.fs.FsTracer;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
@ -633,6 +634,7 @@ public void readBlock(final ExtendedBlock block,
|
|||||||
datanode.metrics.incrBytesRead((int) read);
|
datanode.metrics.incrBytesRead((int) read);
|
||||||
datanode.metrics.incrBlocksRead();
|
datanode.metrics.incrBlocksRead();
|
||||||
datanode.metrics.incrTotalReadTime(duration);
|
datanode.metrics.incrTotalReadTime(duration);
|
||||||
|
DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
|
||||||
} catch ( SocketException ignored ) {
|
} catch ( SocketException ignored ) {
|
||||||
LOG.trace("{}:Ignoring exception while serving {} to {}",
|
LOG.trace("{}:Ignoring exception while serving {} to {}",
|
||||||
dnR, block, remoteAddress, ignored);
|
dnR, block, remoteAddress, ignored);
|
||||||
@ -1122,6 +1124,7 @@ public void copyBlock(final ExtendedBlock block,
|
|||||||
datanode.metrics.incrBytesRead((int) read);
|
datanode.metrics.incrBytesRead((int) read);
|
||||||
datanode.metrics.incrBlocksRead();
|
datanode.metrics.incrBlocksRead();
|
||||||
datanode.metrics.incrTotalReadTime(duration);
|
datanode.metrics.incrTotalReadTime(duration);
|
||||||
|
DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
|
||||||
|
|
||||||
LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
|
LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -61,6 +61,8 @@ public class DataNodeMetrics {
|
|||||||
@Metric MutableCounterLong bytesRead;
|
@Metric MutableCounterLong bytesRead;
|
||||||
@Metric("Milliseconds spent reading")
|
@Metric("Milliseconds spent reading")
|
||||||
MutableCounterLong totalReadTime;
|
MutableCounterLong totalReadTime;
|
||||||
|
@Metric private MutableRate readTransferRate;
|
||||||
|
final private MutableQuantiles[] readTransferRateQuantiles;
|
||||||
@Metric MutableCounterLong blocksWritten;
|
@Metric MutableCounterLong blocksWritten;
|
||||||
@Metric MutableCounterLong blocksRead;
|
@Metric MutableCounterLong blocksRead;
|
||||||
@Metric MutableCounterLong blocksReplicated;
|
@Metric MutableCounterLong blocksReplicated;
|
||||||
@ -201,6 +203,7 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
|
|||||||
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
|
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
|
||||||
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
|
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
|
||||||
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
|
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
|
||||||
|
readTransferRateQuantiles = new MutableQuantiles[len];
|
||||||
|
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
int interval = intervals[i];
|
int interval = intervals[i];
|
||||||
@ -229,6 +232,10 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
|
|||||||
"ramDiskBlocksLazyPersistWindows" + interval + "s",
|
"ramDiskBlocksLazyPersistWindows" + interval + "s",
|
||||||
"Time between the RamDisk block write and disk persist in ms",
|
"Time between the RamDisk block write and disk persist in ms",
|
||||||
"ops", "latency", interval);
|
"ops", "latency", interval);
|
||||||
|
readTransferRateQuantiles[i] = registry.newQuantiles(
|
||||||
|
"readTransferRate" + interval + "s",
|
||||||
|
"Rate at which bytes are read from datanode calculated in bytes per second",
|
||||||
|
"ops", "rate", interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,6 +297,13 @@ public void addIncrementalBlockReport(long latency,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addReadTransferRate(long readTransferRate) {
|
||||||
|
this.readTransferRate.add(readTransferRate);
|
||||||
|
for (MutableQuantiles q : readTransferRateQuantiles) {
|
||||||
|
q.add(readTransferRate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void addCacheReport(long latency) {
|
public void addCacheReport(long latency) {
|
||||||
cacheReports.add(latency);
|
cacheReports.add(latency);
|
||||||
}
|
}
|
||||||
|
@ -45,6 +45,7 @@
|
|||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -70,6 +71,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
@ -1109,4 +1111,18 @@ public void testErrorMessageForInvalidNameservice() throws Exception {
|
|||||||
LambdaTestUtils.intercept(IOException.class, expectedErrorMessage,
|
LambdaTestUtils.intercept(IOException.class, expectedErrorMessage,
|
||||||
()->DFSUtil.getNNServiceRpcAddressesForCluster(conf));
|
()->DFSUtil.getNNServiceRpcAddressesForCluster(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddTransferRateMetricForValidValues() {
|
||||||
|
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
|
||||||
|
DFSUtil.addTransferRateMetric(mockMetrics, 100, 10);
|
||||||
|
verify(mockMetrics).addReadTransferRate(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddTransferRateMetricForInvalidValue() {
|
||||||
|
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
|
||||||
|
DFSUtil.addTransferRateMetric(mockMetrics, 100, 0);
|
||||||
|
verify(mockMetrics, times(0)).addReadTransferRate(anyLong());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -274,6 +274,7 @@ public void testTimeoutMetric() throws Exception {
|
|||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testDataNodeTimeSpend() throws Exception {
|
public void testDataNodeTimeSpend() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + 60);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
try {
|
try {
|
||||||
final FileSystem fs = cluster.getFileSystem();
|
final FileSystem fs = cluster.getFileSystem();
|
||||||
@ -285,6 +286,7 @@ public void testDataNodeTimeSpend() throws Exception {
|
|||||||
|
|
||||||
final long startWriteValue = getLongCounter("TotalWriteTime", rb);
|
final long startWriteValue = getLongCounter("TotalWriteTime", rb);
|
||||||
final long startReadValue = getLongCounter("TotalReadTime", rb);
|
final long startReadValue = getLongCounter("TotalReadTime", rb);
|
||||||
|
assertCounter("ReadTransferRateNumOps", 0L, rb);
|
||||||
final AtomicInteger x = new AtomicInteger(0);
|
final AtomicInteger x = new AtomicInteger(0);
|
||||||
|
|
||||||
// Lets Metric system update latest metrics
|
// Lets Metric system update latest metrics
|
||||||
@ -304,6 +306,8 @@ public Boolean get() {
|
|||||||
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
|
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
|
||||||
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
|
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
|
||||||
final long endReadValue = getLongCounter("TotalReadTime", rbNew);
|
final long endReadValue = getLongCounter("TotalReadTime", rbNew);
|
||||||
|
assertCounter("ReadTransferRateNumOps", 1L, rbNew);
|
||||||
|
assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
|
||||||
return endWriteValue > startWriteValue
|
return endWriteValue > startWriteValue
|
||||||
&& endReadValue > startReadValue;
|
&& endReadValue > startReadValue;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user