HDFS-16090. Fine grained lock for datanodeNetworkCounts (#3148)

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
    Signed-off-by: stack Chuang <stack@apache.org>
    Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
Viraj Jasani 2021-07-01 01:25:07 +05:30 committed by stack
parent 628b08db63
commit bf6ed21104

View File

@ -85,6 +85,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -315,6 +316,8 @@ public class DataNode extends ReconfigurableBase
private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
private final FileIoProvider fileIoProvider;
private static final String NETWORK_ERRORS = "networkErrors";
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
*/
@ -351,8 +354,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private DataNodePeerMetrics peerMetrics;
private DataNodeDiskMetrics diskMetrics;
private InetSocketAddress streamingAddr;
// See the note below in incrDatanodeNetworkErrors re: concurrency.
private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
private String hostName;
@ -523,9 +525,9 @@ private static Tracer createTracer(Configuration conf) {
.maximumSize(dncCacheMaxSize)
.build(new CacheLoader<String, Map<String, Long>>() {
@Override
public Map<String, Long> load(String key) throws Exception {
final Map<String, Long> ret = new HashMap<String, Long>();
ret.put("networkErrors", 0L);
public Map<String, Long> load(String key) {
final Map<String, Long> ret = new ConcurrentHashMap<>();
ret.put(NETWORK_ERRORS, 0L);
return ret;
}
});
@ -2257,19 +2259,11 @@ public Map<String, Map<String, Long>> getDatanodeNetworkCounts() {
void incrDatanodeNetworkErrors(String host) {
metrics.incrDatanodeNetworkErrors();
/*
* Synchronizing on the whole cache is a big hammer, but since it's only
* accumulating errors, it should be ok. If this is ever expanded to include
* non-error stats, then finer-grained concurrency should be applied.
*/
synchronized (datanodeNetworkCounts) {
try {
final Map<String, Long> curCount = datanodeNetworkCounts.get(host);
curCount.put("networkErrors", curCount.get("networkErrors") + 1L);
datanodeNetworkCounts.put(host, curCount);
} catch (ExecutionException e) {
LOG.warn("failed to increment network error counts for host: {}", host);
}
try {
datanodeNetworkCounts.get(host).compute(NETWORK_ERRORS,
(key, errors) -> errors == null ? 1L : errors + 1L);
} catch (ExecutionException e) {
LOG.warn("Failed to increment network error counts for host: {}", host);
}
}