diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 602ac008fa..e1501fadce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -169,6 +169,7 @@ public void testReceivePacketSlowMetrics() throws Exception { conf.setInt(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, interval); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3).build(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); try { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); @@ -190,22 +191,30 @@ public Object answer(InvocationOnMock invocationOnMock) DataNodeFaultInjector.set(injector); Path testFile = new Path("/testFlushNanosMetric.txt"); FSDataOutputStream fout = fs.create(testFile); + DFSOutputStream dout = (DFSOutputStream) fout.getWrappedStream(); fout.write(new byte[1]); fout.hsync(); + DatanodeInfo[] pipeline = dout.getPipeline(); fout.close(); + dout.close(); + DatanodeInfo headDatanodeInfo = pipeline[0]; List datanodes = cluster.getDataNodes(); - DataNode datanode = datanodes.get(0); - MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name()); + DataNode headNode = datanodes.stream().filter(d -> d.getDatanodeId().equals(headDatanodeInfo)) + .findFirst().orElseGet(null); + assertNotNull("Could not find the head of the datanode write pipeline", + headNode); + MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics().name()); assertTrue("More than 1 packet received", - getLongCounter("TotalPacketsReceived", dnMetrics) > 1L); + getLongCounter("PacketsReceived", dnMetrics) > 1L); assertTrue("More than 1 slow packet to mirror", - getLongCounter("TotalPacketsSlowWriteToMirror", dnMetrics) > 1L); - assertCounter("TotalPacketsSlowWriteToDisk", 1L, dnMetrics); - assertCounter("TotalPacketsSlowWriteToOsCache", 0L, dnMetrics); + getLongCounter("PacketsSlowWriteToMirror", dnMetrics) > 1L); + assertCounter("PacketsSlowWriteToDisk", 1L, dnMetrics); + assertCounter("PacketsSlowWriteToOsCache", 0L, dnMetrics); } finally { if (cluster != null) { cluster.shutdown(); } + DataNodeFaultInjector.set(oldInjector); } }