From bf6ed21104c1c2ae5171e36afc68e70286d8c90e Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Thu, 1 Jul 2021 01:25:07 +0530 Subject: [PATCH] HDFS-16090. Fine grained lock for datanodeNetworkCounts (#3148) Signed-off-by: Wei-Chiu Chuang Signed-off-by: stack Chuang Signed-off-by: Akira Ajisaka --- .../hadoop/hdfs/server/datanode/DataNode.java | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 446cab6bd5..473ef065c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -85,6 +85,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -315,6 +316,8 @@ public class DataNode extends ReconfigurableBase private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace."; private final FileIoProvider fileIoProvider; + private static final String NETWORK_ERRORS = "networkErrors"; + /** * Use {@link NetUtils#createSocketAddr(String)} instead. */ @@ -351,8 +354,7 @@ public static InetSocketAddress createSocketAddr(String target) { private DataNodePeerMetrics peerMetrics; private DataNodeDiskMetrics diskMetrics; private InetSocketAddress streamingAddr; - - // See the note below in incrDatanodeNetworkErrors re: concurrency. + private LoadingCache> datanodeNetworkCounts; private String hostName; @@ -523,9 +525,9 @@ private static Tracer createTracer(Configuration conf) { .maximumSize(dncCacheMaxSize) .build(new CacheLoader>() { @Override - public Map load(String key) throws Exception { - final Map ret = new HashMap(); - ret.put("networkErrors", 0L); + public Map load(String key) { + final Map ret = new ConcurrentHashMap<>(); + ret.put(NETWORK_ERRORS, 0L); return ret; } }); @@ -2257,19 +2259,11 @@ public Map> getDatanodeNetworkCounts() { void incrDatanodeNetworkErrors(String host) { metrics.incrDatanodeNetworkErrors(); - /* - * Synchronizing on the whole cache is a big hammer, but since it's only - * accumulating errors, it should be ok. If this is ever expanded to include - * non-error stats, then finer-grained concurrency should be applied. - */ - synchronized (datanodeNetworkCounts) { - try { - final Map curCount = datanodeNetworkCounts.get(host); - curCount.put("networkErrors", curCount.get("networkErrors") + 1L); - datanodeNetworkCounts.put(host, curCount); - } catch (ExecutionException e) { - LOG.warn("failed to increment network error counts for host: {}", host); - } + try { + datanodeNetworkCounts.get(host).compute(NETWORK_ERRORS, + (key, errors) -> errors == null ? 1L : errors + 1L); + } catch (ExecutionException e) { + LOG.warn("Failed to increment network error counts for host: {}", host); } }