HDFS-17301. Add read and write dataXceiver threads count metrics to datanode. (#6377)

Reviewed-by: hfutatzhanghb <hfutzhanghb@163.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
huangzhaobo 2023-12-29 11:43:46 +08:00 committed by GitHub
parent 3aceec711b
commit e26139beaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 0 deletions

View File

@ -425,6 +425,12 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `RamDiskBlocksLazyPersistWindows`*num*`s(50/75/90/95/99)thPercentileLatency` | The 50/75/90/95/99th percentile of latency between memory write and disk persist in milliseconds (*num* seconds granularity). Percentile measurement is off by default, by watching no intervals. The intervals are specified by `dfs.metrics.percentiles.intervals`. | | `RamDiskBlocksLazyPersistWindows`*num*`s(50/75/90/95/99)thPercentileLatency` | The 50/75/90/95/99th percentile of latency between memory write and disk persist in milliseconds (*num* seconds granularity). Percentile measurement is off by default, by watching no intervals. The intervals are specified by `dfs.metrics.percentiles.intervals`. |
| `FsyncCount` | Total number of fsync | | `FsyncCount` | Total number of fsync |
| `VolumeFailures` | Total number of volume failures occurred | | `VolumeFailures` | Total number of volume failures occurred |
| `DatanodeNetworkErrors` | Count of network errors on the datanode |
| `DataNodeActiveXceiversCount` | Count of active dataNode xceivers |
| `DataNodeReadActiveXceiversCount` | Count of read active dataNode xceivers |
| `DataNodeWriteActiveXceiversCount` | Count of write active dataNode xceivers |
| `DataNodePacketResponderCount` | Count of active DataNode packetResponder |
| `DataNodeBlockRecoveryWorkerCount` | Count of active DataNode block recovery worker |
| `ReadBlockOpNumOps` | Total number of read operations | | `ReadBlockOpNumOps` | Total number of read operations |
| `ReadBlockOpAvgTime` | Average time of read operations in milliseconds | | `ReadBlockOpAvgTime` | Average time of read operations in milliseconds |
| `WriteBlockOpNumOps` | Total number of write operations | | `WriteBlockOpNumOps` | Total number of write operations |

View File

@ -2639,6 +2639,8 @@ public void shutdown() {
} }
if (metrics != null) { if (metrics != null) {
metrics.setDataNodeActiveXceiversCount(0); metrics.setDataNodeActiveXceiversCount(0);
metrics.setDataNodeReadActiveXceiversCount(0);
metrics.setDataNodeWriteActiveXceiversCount(0);
metrics.setDataNodePacketResponderCount(0); metrics.setDataNodePacketResponderCount(0);
metrics.setDataNodeBlockRecoveryWorkerCount(0); metrics.setDataNodeBlockRecoveryWorkerCount(0);
} }

View File

@ -223,6 +223,7 @@ private synchronized BlockReceiver getCurrentBlockReceiver() {
public void run() { public void run() {
int opsProcessed = 0; int opsProcessed = 0;
Op op = null; Op op = null;
Op firstOp = null;
try { try {
synchronized(this) { synchronized(this) {
@ -290,6 +291,11 @@ public void run() {
} }
opStartTime = monotonicNow(); opStartTime = monotonicNow();
// compatible with loop retry requests
if (firstOp == null) {
firstOp = op;
incrReadWriteOpMetrics(op);
}
processOp(op); processOp(op);
++opsProcessed; ++opsProcessed;
} while ((peer != null) && } while ((peer != null) &&
@ -330,6 +336,9 @@ public void run() {
datanode.getDisplayName(), datanode.getXceiverCount()); datanode.getDisplayName(), datanode.getXceiverCount());
updateCurrentThreadName("Cleaning up"); updateCurrentThreadName("Cleaning up");
if (peer != null) { if (peer != null) {
if (firstOp != null) {
decrReadWriteOpMetrics(op);
}
dataXceiverServer.closePeer(peer); dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in); IOUtils.closeStream(in);
} }
@ -1466,4 +1475,20 @@ private void checkAccess(OutputStream out, final boolean reply,
} }
} }
} }
private void incrReadWriteOpMetrics(Op op) {
if (Op.READ_BLOCK.equals(op)) {
datanode.getMetrics().incrDataNodeReadActiveXceiversCount();
} else if (Op.WRITE_BLOCK.equals(op)) {
datanode.getMetrics().incrDataNodeWriteActiveXceiversCount();
}
}
private void decrReadWriteOpMetrics(Op op) {
if (Op.READ_BLOCK.equals(op)) {
datanode.getMetrics().decrDataNodeReadActiveXceiversCount();
} else if (Op.WRITE_BLOCK.equals(op)) {
datanode.getMetrics().decrDataNodeWriteActiveXceiversCount();
}
}
} }

View File

@ -413,6 +413,8 @@ void closeAllPeers() {
peers.clear(); peers.clear();
peersXceiver.clear(); peersXceiver.clear();
datanode.metrics.setDataNodeActiveXceiversCount(0); datanode.metrics.setDataNodeActiveXceiversCount(0);
datanode.metrics.setDataNodeReadActiveXceiversCount(0);
datanode.metrics.setDataNodeWriteActiveXceiversCount(0);
this.noPeers.signalAll(); this.noPeers.signalAll();
} finally { } finally {
lock.unlock(); lock.unlock();

View File

@ -111,6 +111,12 @@ public class DataNodeMetrics {
@Metric("Count of active dataNode xceivers") @Metric("Count of active dataNode xceivers")
private MutableGaugeInt dataNodeActiveXceiversCount; private MutableGaugeInt dataNodeActiveXceiversCount;
@Metric("Count of read active dataNode xceivers")
private MutableGaugeInt dataNodeReadActiveXceiversCount;
@Metric("Count of write active dataNode xceivers")
private MutableGaugeInt dataNodeWriteActiveXceiversCount;
@Metric("Count of active DataNode packetResponder") @Metric("Count of active DataNode packetResponder")
private MutableGaugeInt dataNodePacketResponderCount; private MutableGaugeInt dataNodePacketResponderCount;
@ -599,6 +605,30 @@ public int getDataNodeActiveXceiverCount() {
return dataNodeActiveXceiversCount.value(); return dataNodeActiveXceiversCount.value();
} }
public void incrDataNodeReadActiveXceiversCount(){
dataNodeReadActiveXceiversCount.incr();
}
public void decrDataNodeReadActiveXceiversCount(){
dataNodeReadActiveXceiversCount.decr();
}
public void setDataNodeReadActiveXceiversCount(int value){
dataNodeReadActiveXceiversCount.set(value);
}
public void incrDataNodeWriteActiveXceiversCount(){
dataNodeWriteActiveXceiversCount.incr();
}
public void decrDataNodeWriteActiveXceiversCount(){
dataNodeWriteActiveXceiversCount.decr();
}
public void setDataNodeWriteActiveXceiversCount(int value){
dataNodeWriteActiveXceiversCount.set(value);
}
public void incrDataNodePacketResponderCount() { public void incrDataNodePacketResponderCount() {
dataNodePacketResponderCount.incr(); dataNodePacketResponderCount.incr();
} }

View File

@ -38,6 +38,8 @@
import java.util.function.Supplier; import java.util.function.Supplier;
import net.jcip.annotations.NotThreadSafe; import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
@ -751,4 +753,67 @@ public void testNodeLocalMetrics() throws Exception {
} }
} }
} }
@Test
public void testDataNodeReadWriteXceiversCount() throws Exception {
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build()) {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(1, datanodes.size());
DataNode datanode = datanodes.get(0);
// Test DataNodeWriteActiveXceiversCount Metric
long writeXceiversCount = MetricsAsserts.getIntGauge("DataNodeWriteActiveXceiversCount",
getMetrics(datanode.getMetrics().name()));
assertEquals(0, writeXceiversCount);
Path path = new Path("/testDataNodeReadWriteXceiversCount.txt");
try (FSDataOutputStream output = fs.create(path)) {
output.write(new byte[1024]);
output.hsync();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int writeXceiversCount = MetricsAsserts.getIntGauge("DataNodeWriteActiveXceiversCount",
getMetrics(datanode.getMetrics().name()));
return writeXceiversCount == 1;
}
}, 100, 10000);
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int writeXceiversCount = MetricsAsserts.getIntGauge("DataNodeWriteActiveXceiversCount",
getMetrics(datanode.getMetrics().name()));
return writeXceiversCount == 0;
}
}, 100, 10000);
// Test DataNodeReadActiveXceiversCount Metric
long readXceiversCount = MetricsAsserts.getIntGauge("DataNodeReadActiveXceiversCount",
getMetrics(datanode.getMetrics().name()));
assertEquals(0, readXceiversCount);
try (FSDataInputStream input = fs.open(path)) {
byte[] byteArray = new byte[1024];
input.read(byteArray);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int readXceiversCount = MetricsAsserts.getIntGauge("DataNodeReadActiveXceiversCount",
getMetrics(datanode.getMetrics().name()));
return readXceiversCount == 1;
}
}, 100, 10000);
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int readXceiversCount = MetricsAsserts.getIntGauge("DataNodeReadActiveXceiversCount",
getMetrics(datanode.getMetrics().name()));
return readXceiversCount == 0;
}
}, 100, 10000);
}
}
} }