diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index d2e2ee2adc..49156c251a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -544,7 +544,7 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), - dn.getXceiverCount(), + dn.getActiveTransferThreadCount(), numFailedVolumes, volumeFailureSummary, requestBlockReportLease, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 8233d30f82..cc9551d1d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -1368,6 +1368,7 @@ public void close() { */ @Override public void run() { + datanode.metrics.incrDataNodePacketResponderCount(); boolean lastPacketInBlock = false; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (isRunning() && !lastPacketInBlock) { @@ -1505,6 +1506,9 @@ public void run() { } } } + // Any exception will be caught and processed in the previous loop, so we + // will always arrive here when the thread exiting + datanode.metrics.decrDataNodePacketResponderCount(); LOG.info(myString + " terminating"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java index edd38f9389..d4687e8331 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java @@ -601,17 +601,22 @@ public Daemon recoverBlocks(final String who, Daemon d = new Daemon(datanode.threadGroup, new Runnable() { @Override public void run() { - for(RecoveringBlock b : blocks) { - try { - logRecoverBlock(who, b); - if (b.isStriped()) { - new RecoveryTaskStriped((RecoveringStripedBlock) b).recover(); - } else { - new RecoveryTaskContiguous(b).recover(); + datanode.metrics.incrDataNodeBlockRecoveryWorkerCount(); + try { + for (RecoveringBlock b : blocks) { + try { + logRecoverBlock(who, b); + if (b.isStriped()) { + new RecoveryTaskStriped((RecoveringStripedBlock) b).recover(); + } else { + new RecoveryTaskContiguous(b).recover(); + } + } catch (IOException e) { + LOG.warn("recover Block: {} FAILED: {}", b, e); } - } catch (IOException e) { - LOG.warn("recoverBlocks FAILED: " + b, e); } + } finally { + datanode.metrics.decrDataNodeBlockRecoveryWorkerCount(); } } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c6b5e1ec89..4d2fba4573 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2148,6 +2148,8 @@ public void shutdown() { } if (metrics != null) { metrics.setDataNodeActiveXceiversCount(0); + metrics.setDataNodePacketResponderCount(0); + metrics.setDataNodeBlockRecoveryWorkerCount(0); } // IPC server needs to be shutdown late in the process, otherwise @@ -2246,7 +2248,20 @@ private void handleDiskError(String failedVolumes, int failedNumber) { /** Number of concurrent xceivers per node. */ @Override // DataNodeMXBean public int getXceiverCount() { - return threadGroup == null ? 0 : threadGroup.activeCount(); + if (metrics == null) { + return 0; + } + return metrics.getDataNodeActiveXceiverCount(); + } + + @Override // DataNodeMXBean + public int getActiveTransferThreadCount() { + if (metrics == null) { + return 0; + } + return metrics.getDataNodeActiveXceiverCount() + + metrics.getDataNodePacketResponderCount() + + metrics.getDataNodeBlockRecoveryWorkerCount(); } @Override // DataNodeMXBean diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java index 9d11e14884..7a8f59bb66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java @@ -104,11 +104,15 @@ public interface DataNodeMXBean { public String getClusterId(); /** - * Returns an estimate of the number of Datanode threads - * actively transferring blocks. + * Returns the number of active xceivers. */ public int getXceiverCount(); + /** + * Returns the number of Datanode threads actively transferring blocks. + */ + int getActiveTransferThreadCount(); + /** * Returns an estimate of the number of data replication/reconstruction tasks * running currently. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index cc802375f9..6e633147b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -109,6 +109,12 @@ public class DataNodeMetrics { @Metric("Count of active dataNode xceivers") private MutableGaugeInt dataNodeActiveXceiversCount; + @Metric("Count of active DataNode packetResponder") + private MutableGaugeInt dataNodePacketResponderCount; + + @Metric("Count of active DataNode block recovery worker") + private MutableGaugeInt dataNodeBlockRecoveryWorkerCount; + @Metric MutableRate readBlockOp; @Metric MutableRate writeBlockOp; @Metric MutableRate blockChecksumOp; @@ -525,6 +531,42 @@ public void setDataNodeActiveXceiversCount(int value) { dataNodeActiveXceiversCount.set(value); } + public int getDataNodeActiveXceiverCount() { + return dataNodeActiveXceiversCount.value(); + } + + public void incrDataNodePacketResponderCount() { + dataNodePacketResponderCount.incr(); + } + + public void decrDataNodePacketResponderCount() { + dataNodePacketResponderCount.decr(); + } + + public void setDataNodePacketResponderCount(int value) { + dataNodePacketResponderCount.set(value); + } + + public int getDataNodePacketResponderCount() { + return dataNodePacketResponderCount.value(); + } + + public void incrDataNodeBlockRecoveryWorkerCount() { + dataNodeBlockRecoveryWorkerCount.incr(); + } + + public void decrDataNodeBlockRecoveryWorkerCount() { + dataNodeBlockRecoveryWorkerCount.decr(); + } + + public void setDataNodeBlockRecoveryWorkerCount(int value) { + dataNodeBlockRecoveryWorkerCount.set(value); + } + + public int getDataNodeBlockRecoveryWorkerCount() { + return dataNodeBlockRecoveryWorkerCount.value(); + } + public void incrECDecodingTime(long decodingTimeNanos) { ecDecodingTimeNanos.incr(decodingTimeNanos); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index f559783db8..0bf21ee1d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -253,12 +253,11 @@ private void assertXceiverCount(int expected) { } /** - * Returns the datanode's xceiver count, but subtracts 1, since the - * DataXceiverServer counts as one. + * Returns the datanode's active xceiver count. * - * @return int xceiver count, not including DataXceiverServer + * @return the datanode's active xceivers count. */ private int getXceiverCountWithoutServer() { - return dn.getXceiverCount() - 1; + return dn.getXceiverCount(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 4db7b4d194..85664ebc95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.lang.management.ManagementFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -374,6 +375,57 @@ public void testDatanodeActiveXceiversCount() throws Exception { } } + @Test + public void testDataNodeMXBeanActiveThreadCount() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + FileSystem fs = cluster.getFileSystem(); + Path p = new Path("/testfile"); + + try { + List datanodes = cluster.getDataNodes(); + assertEquals(1, datanodes.size()); + DataNode datanode = datanodes.get(0); + + // create a xceiver thread for write + FSDataOutputStream os = fs.create(p); + for (int i = 0; i < 1024; i++) { + os.write("testdatastr".getBytes()); + } + os.hsync(); + // create a xceiver thread for read + InputStream is = fs.open(p); + is.read(new byte[16], 0, 4); + + int threadCount = datanode.threadGroup.activeCount(); + assertTrue(threadCount > 0); + Thread[] threads = new Thread[threadCount]; + datanode.threadGroup.enumerate(threads); + int xceiverCount = 0; + int responderCount = 0; + int recoveryWorkerCount = 0; + for (Thread t : threads) { + if (t.getName().contains("DataXceiver for client")) { + xceiverCount++; + } else if (t.getName().contains("PacketResponder")) { + responderCount++; + } + } + assertEquals(2, xceiverCount); + assertEquals(1, responderCount); + assertEquals(0, recoveryWorkerCount); //not easy to produce + assertEquals(xceiverCount, datanode.getXceiverCount()); + assertEquals(xceiverCount + responderCount + recoveryWorkerCount, + datanode.getActiveTransferThreadCount()); + + is.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + @Test public void testDNShouldNotDeleteBlockONTooManyOpenFiles() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java index 4343b0acd0..c3abc12bf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java @@ -226,9 +226,9 @@ public void testXceiverCountInternal(int minMaintenanceR) throws Exception { triggerHeartbeats(datanodes); // check that all nodes are live and in service - int expectedTotalLoad = nodes; // xceiver server adds 1 to load + int expectedTotalLoad = 0; int expectedInServiceNodes = nodes; - int expectedInServiceLoad = nodes; + int expectedInServiceLoad = 0; checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); @@ -333,10 +333,7 @@ public void testXceiverCountInternal(int minMaintenanceR) throws Exception { expectedInServiceNodes--; } assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); - // live nodes always report load of 1. no nodes is load 0 - double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0; - assertEquals((double)expectedXceiverAvg, - getInServiceXceiverAverage(namesystem), EPSILON); + assertEquals(0, getInServiceXceiverAverage(namesystem), EPSILON); } // final sanity check checkClusterHealth(0, namesystem, 0.0, 0, 0.0);