From 2d4f3e567e4bb8068c028de12df118a4f3fa6343 Mon Sep 17 00:00:00 2001 From: "Aaron T. Myers" Date: Fri, 21 Nov 2014 16:34:08 -0800 Subject: [PATCH] HDFS-7331. Add Datanode network counts to datanode jmx page. Contributed by Charles Lamb. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hadoop/hdfs/server/datanode/DataNode.java | 47 +++++++++++++++++++ .../hdfs/server/datanode/DataNodeMXBean.java | 7 +++ .../hdfs/server/datanode/DataXceiver.java | 27 +++++++---- .../server/datanode/TestDataNodeMetrics.java | 18 +++++++ 6 files changed, 94 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 408a6ed3a9..3f12cec424 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -385,6 +385,9 @@ Release 2.7.0 - UNRELEASED HDFS-7420. Delegate permission checks to FSDirectory. (wheat9) + HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb + via atm) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index af18f4dad1..78cae9cb25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -155,6 +155,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f; public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes"; public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT; + public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size"; + public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; // This setting is for testing/internal use only. public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a53698a179..2ff6870a40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; @@ -77,6 +79,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -84,6 +87,9 @@ import javax.management.ObjectName; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -299,6 +305,9 @@ public static InetSocketAddress createSocketAddr(String target) { DataNodeMetrics metrics; private InetSocketAddress streamingAddr; + // See the note below in incrDatanodeNetworkErrors re: concurrency. + private LoadingCache> datanodeNetworkCounts; + private String hostName; private DatanodeID id; @@ -414,6 +423,20 @@ public static InetSocketAddress createSocketAddr(String target) { shutdown(); throw ie; } + final int dncCacheMaxSize = + conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY, + DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ; + datanodeNetworkCounts = + CacheBuilder.newBuilder() + .maximumSize(dncCacheMaxSize) + .build(new CacheLoader>() { + @Override + public Map load(String key) throws Exception { + final Map ret = new HashMap(); + ret.put("networkErrors", 0L); + return ret; + } + }); } @Override @@ -1767,6 +1790,30 @@ private void handleDiskError(String errMsgr) { public int getXceiverCount() { return threadGroup == null ? 0 : threadGroup.activeCount(); } + + @Override // DataNodeMXBean + public Map> getDatanodeNetworkCounts() { + return datanodeNetworkCounts.asMap(); + } + + void incrDatanodeNetworkErrors(String host) { + metrics.incrDatanodeNetworkErrors(); + + /* + * Synchronizing on the whole cache is a big hammer, but since it's only + * accumulating errors, it should be ok. If this is ever expanded to include + * non-error stats, then finer-grained concurrency should be applied. + */ + synchronized (datanodeNetworkCounts) { + try { + final Map curCount = datanodeNetworkCounts.get(host); + curCount.put("networkErrors", curCount.get("networkErrors") + 1L); + datanodeNetworkCounts.put(host, curCount); + } catch (ExecutionException e) { + LOG.warn("failed to increment network error counts for " + host); + } + } + } int getXmitsInProgress() { return xmitsInProgress.get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java index 8e80c58742..92abd886fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java @@ -20,6 +20,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import java.util.Map; + /** * * This is the JMX management interface for data node information @@ -76,4 +78,9 @@ public interface DataNodeMXBean { * actively transferring blocks. */ public int getXceiverCount(); + + /** + * Gets the network error counts on a per-Datanode basis. + */ + public Map> getDatanodeNetworkCounts(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index a235c20cc6..61b9c675ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -97,6 +97,7 @@ class DataXceiver extends Receiver implements Runnable { private Peer peer; private final String remoteAddress; // address of remote side + private final String remoteAddressWithoutPort; // only the address, no port private final String localAddress; // local address of this daemon private final DataNode datanode; private final DNConf dnConf; @@ -129,6 +130,9 @@ private DataXceiver(Peer peer, DataNode datanode, this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; remoteAddress = peer.getRemoteAddressString(); + final int colonIdx = remoteAddress.indexOf(':'); + remoteAddressWithoutPort = + (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx); localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { @@ -222,7 +226,7 @@ public void run() { LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw err; } break; @@ -521,7 +525,7 @@ public void readBlock(final ExtendedBlock block, } catch (IOException ioe) { LOG.debug("Error reading client status response. Will close connection.", ioe); IOUtils.closeStream(out); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } } else { IOUtils.closeStream(out); @@ -543,7 +547,7 @@ public void readBlock(final ExtendedBlock block, if (!(ioe instanceof SocketTimeoutException)) { LOG.warn(dnR + ":Got exception while serving " + block + " to " + remoteAddress, ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } throw ioe; } finally { @@ -722,7 +726,7 @@ public void writeBlock(final ExtendedBlock block, LOG.info(datanode + ":Exception transfering " + block + " to mirror " + mirrorNode + "- continuing without the mirror", e); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } } } @@ -777,7 +781,7 @@ public void writeBlock(final ExtendedBlock block, } catch (IOException ioe) { LOG.info("opWriteBlock " + block + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { // close all opened streams @@ -813,7 +817,7 @@ public void transferBlock(final ExtendedBlock blk, writeResponse(Status.SUCCESS, null, out); } catch (IOException ioe) { LOG.info("transferBlock " + blk + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); @@ -908,7 +912,7 @@ public void blockChecksum(final ExtendedBlock block, out.flush(); } catch (IOException ioe) { LOG.info("blockChecksum " + block + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); @@ -975,7 +979,7 @@ public void copyBlock(final ExtendedBlock block, } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock " + block + " received exception " + ioe); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); throw ioe; } finally { dataXceiverServer.balanceThrottler.release(); @@ -1108,7 +1112,7 @@ public void replaceBlock(final ExtendedBlock block, LOG.info(errMsg); if (!IoeDuringCopyBlockOperation) { // Don't double count IO errors - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } throw ioe; } finally { @@ -1128,7 +1132,7 @@ public void replaceBlock(final ExtendedBlock block, sendResponse(opStatus, errMsg); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + peer.getRemoteAddressString()); - datanode.metrics.incrDatanodeNetworkErrors(); + incrDatanodeNetworkErrors(); } IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); @@ -1182,6 +1186,9 @@ private void writeSuccessWithChecksumInfo(BlockSender blockSender, out.flush(); } + private void incrDatanodeNetworkErrors() { + datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); + } private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 90112af245..0b85d35b0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -27,7 +27,9 @@ import java.io.Closeable; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.List; +import java.util.Map; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; @@ -48,6 +50,9 @@ import org.junit.Test; import org.mockito.Mockito; +import javax.management.MBeanServer; +import javax.management.ObjectName; + public class TestDataNodeMetrics { private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class); @@ -217,9 +222,22 @@ public void testTimeoutMetric() throws Exception { out.writeBytes("old gs data\n"); out.hflush(); + /* Test the metric. */ final MetricsRecordBuilder dnMetrics = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); assertCounter("DatanodeNetworkErrors", 1L, dnMetrics); + + /* Test JMX datanode network counts. */ + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName mxbeanName = + new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo"); + final Object dnc = + mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts"); + final String allDnc = dnc.toString(); + assertTrue("expected to see loopback address", + allDnc.indexOf("127.0.0.1") >= 0); + assertTrue("expected to see networkErrors", + allDnc.indexOf("networkErrors") >= 0); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); if (cluster != null) {