diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 45c323aa9d..10f5624a76 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -425,6 +425,12 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `RamDiskBlocksLazyPersistWindows`*num*`s(50/75/90/95/99)thPercentileLatency` | The 50/75/90/95/99th percentile of latency between memory write and disk persist in milliseconds (*num* seconds granularity). Percentile measurement is off by default, by watching no intervals. The intervals are specified by `dfs.metrics.percentiles.intervals`. | | `FsyncCount` | Total number of fsync | | `VolumeFailures` | Total number of volume failures occurred | +| `DatanodeNetworkErrors` | Count of network errors on the datanode | +| `DataNodeActiveXceiversCount` | Count of active dataNode xceivers | +| `DataNodeReadActiveXceiversCount` | Count of read active dataNode xceivers | +| `DataNodeWriteActiveXceiversCount` | Count of write active dataNode xceivers | +| `DataNodePacketResponderCount` | Count of active DataNode packetResponder | +| `DataNodeBlockRecoveryWorkerCount` | Count of active DataNode block recovery worker | | `ReadBlockOpNumOps` | Total number of read operations | | `ReadBlockOpAvgTime` | Average time of read operations in milliseconds | | `WriteBlockOpNumOps` | Total number of write operations | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 53ec38dfa5..956f5bbe51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2639,6 +2639,8 @@ public void shutdown() { } if (metrics != null) { metrics.setDataNodeActiveXceiversCount(0); + metrics.setDataNodeReadActiveXceiversCount(0); + metrics.setDataNodeWriteActiveXceiversCount(0); metrics.setDataNodePacketResponderCount(0); metrics.setDataNodeBlockRecoveryWorkerCount(0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 017e57012b..d948c1caef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -223,6 +223,7 @@ private synchronized BlockReceiver getCurrentBlockReceiver() { public void run() { int opsProcessed = 0; Op op = null; + Op firstOp = null; try { synchronized(this) { @@ -290,6 +291,11 @@ public void run() { } opStartTime = monotonicNow(); + // compatible with loop retry requests + if (firstOp == null) { + firstOp = op; + incrReadWriteOpMetrics(op); + } processOp(op); ++opsProcessed; } while ((peer != null) && @@ -330,6 +336,9 @@ public void run() { datanode.getDisplayName(), datanode.getXceiverCount()); updateCurrentThreadName("Cleaning up"); if (peer != null) { + if (firstOp != null) { + decrReadWriteOpMetrics(op); + } dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); } @@ -1466,4 +1475,20 @@ private void checkAccess(OutputStream out, final boolean reply, } } } + + private void incrReadWriteOpMetrics(Op op) { + if (Op.READ_BLOCK.equals(op)) { + datanode.getMetrics().incrDataNodeReadActiveXceiversCount(); + } else if (Op.WRITE_BLOCK.equals(op)) { + datanode.getMetrics().incrDataNodeWriteActiveXceiversCount(); + } + } + + private void decrReadWriteOpMetrics(Op op) { + if (Op.READ_BLOCK.equals(op)) { + datanode.getMetrics().decrDataNodeReadActiveXceiversCount(); + } else if (Op.WRITE_BLOCK.equals(op)) { + datanode.getMetrics().decrDataNodeWriteActiveXceiversCount(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index ad61f94b92..b936814bdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -413,6 +413,8 @@ void closeAllPeers() { peers.clear(); peersXceiver.clear(); datanode.metrics.setDataNodeActiveXceiversCount(0); + datanode.metrics.setDataNodeReadActiveXceiversCount(0); + datanode.metrics.setDataNodeWriteActiveXceiversCount(0); this.noPeers.signalAll(); } finally { lock.unlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index da19f28711..77e6dab067 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -111,6 +111,12 @@ public class DataNodeMetrics { @Metric("Count of active dataNode xceivers") private MutableGaugeInt dataNodeActiveXceiversCount; + @Metric("Count of read active dataNode xceivers") + private MutableGaugeInt dataNodeReadActiveXceiversCount; + + @Metric("Count of write active dataNode xceivers") + private MutableGaugeInt dataNodeWriteActiveXceiversCount; + @Metric("Count of active DataNode packetResponder") private MutableGaugeInt dataNodePacketResponderCount; @@ -599,6 +605,30 @@ public int getDataNodeActiveXceiverCount() { return dataNodeActiveXceiversCount.value(); } + public void incrDataNodeReadActiveXceiversCount(){ + dataNodeReadActiveXceiversCount.incr(); + } + + public void decrDataNodeReadActiveXceiversCount(){ + dataNodeReadActiveXceiversCount.decr(); + } + + public void setDataNodeReadActiveXceiversCount(int value){ + dataNodeReadActiveXceiversCount.set(value); + } + + public void incrDataNodeWriteActiveXceiversCount(){ + dataNodeWriteActiveXceiversCount.incr(); + } + + public void decrDataNodeWriteActiveXceiversCount(){ + dataNodeWriteActiveXceiversCount.decr(); + } + + public void setDataNodeWriteActiveXceiversCount(int value){ + dataNodeWriteActiveXceiversCount.set(value); + } + public void incrDataNodePacketResponderCount() { dataNodePacketResponderCount.incr(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 89a9fdd1a8..3a0b523836 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -38,6 +38,8 @@ import java.util.function.Supplier; import net.jcip.annotations.NotThreadSafe; + +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.net.unix.DomainSocket; @@ -751,4 +753,67 @@ public void testNodeLocalMetrics() throws Exception { } } } + + @Test + public void testDataNodeReadWriteXceiversCount() throws Exception { + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build()) { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + List datanodes = cluster.getDataNodes(); + assertEquals(1, datanodes.size()); + DataNode datanode = datanodes.get(0); + + // Test DataNodeWriteActiveXceiversCount Metric + long writeXceiversCount = MetricsAsserts.getIntGauge("DataNodeWriteActiveXceiversCount", + getMetrics(datanode.getMetrics().name())); + assertEquals(0, writeXceiversCount); + + Path path = new Path("/testDataNodeReadWriteXceiversCount.txt"); + try (FSDataOutputStream output = fs.create(path)) { + output.write(new byte[1024]); + output.hsync(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int writeXceiversCount = MetricsAsserts.getIntGauge("DataNodeWriteActiveXceiversCount", + getMetrics(datanode.getMetrics().name())); + return writeXceiversCount == 1; + } + }, 100, 10000); + } + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int writeXceiversCount = MetricsAsserts.getIntGauge("DataNodeWriteActiveXceiversCount", + getMetrics(datanode.getMetrics().name())); + return writeXceiversCount == 0; + } + }, 100, 10000); + + // Test DataNodeReadActiveXceiversCount Metric + long readXceiversCount = MetricsAsserts.getIntGauge("DataNodeReadActiveXceiversCount", + getMetrics(datanode.getMetrics().name())); + assertEquals(0, readXceiversCount); + try (FSDataInputStream input = fs.open(path)) { + byte[] byteArray = new byte[1024]; + input.read(byteArray); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int readXceiversCount = MetricsAsserts.getIntGauge("DataNodeReadActiveXceiversCount", + getMetrics(datanode.getMetrics().name())); + return readXceiversCount == 1; + } + }, 100, 10000); + } + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int readXceiversCount = MetricsAsserts.getIntGauge("DataNodeReadActiveXceiversCount", + getMetrics(datanode.getMetrics().name())); + return readXceiversCount == 0; + } + }, 100, 10000); + } + } }