HDFS-15975. Use LongAdder instead of AtomicLong for branch-3.3 (#2940)

Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
litao 2021-04-27 21:39:25 +08:00 committed by GitHub
parent 6649e5888b
commit f45365f201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 53 additions and 52 deletions

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
/** /**
* A mutable long counter * A mutable long counter
@ -32,11 +32,11 @@ import java.util.concurrent.atomic.AtomicLong;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class MutableCounterLong extends MutableCounter { public class MutableCounterLong extends MutableCounter {
private AtomicLong value = new AtomicLong(); private final LongAdder value = new LongAdder();
public MutableCounterLong(MetricsInfo info, long initValue) { public MutableCounterLong(MetricsInfo info, long initValue) {
super(info); super(info);
this.value.set(initValue); this.value.add(initValue);
} }
@Override @Override
@ -49,12 +49,12 @@ public class MutableCounterLong extends MutableCounter {
* @param delta of the increment * @param delta of the increment
*/ */
public void incr(long delta) { public void incr(long delta) {
value.addAndGet(delta); value.add(delta);
setChanged(); setChanged();
} }
public long value() { public long value() {
return value.get(); return value.longValue();
} }
@Override @Override

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
/** /**
* The client-side metrics for hedged read feature. * The client-side metrics for hedged read feature.
@ -28,20 +28,20 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSHedgedReadMetrics { public class DFSHedgedReadMetrics {
public final AtomicLong hedgedReadOps = new AtomicLong(); public final LongAdder hedgedReadOps = new LongAdder();
public final AtomicLong hedgedReadOpsWin = new AtomicLong(); public final LongAdder hedgedReadOpsWin = new LongAdder();
public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong(); public final LongAdder hedgedReadOpsInCurThread = new LongAdder();
public void incHedgedReadOps() { public void incHedgedReadOps() {
hedgedReadOps.incrementAndGet(); hedgedReadOps.increment();
} }
public void incHedgedReadOpsInCurThread() { public void incHedgedReadOpsInCurThread() {
hedgedReadOpsInCurThread.incrementAndGet(); hedgedReadOpsInCurThread.increment();
} }
public void incHedgedReadWins() { public void incHedgedReadWins() {
hedgedReadOpsWin.incrementAndGet(); hedgedReadOpsWin.increment();
} }
public long getHedgedReadOps() { public long getHedgedReadOps() {

View File

@ -26,7 +26,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
/** /**
* This storage statistics tracks how many times each DFS operation was issued. * This storage statistics tracks how many times each DFS operation was issued.
@ -140,21 +140,21 @@ public class DFSOpsCountStatistics extends StorageStatistics {
public static final String NAME = "DFSOpsCountStatistics"; public static final String NAME = "DFSOpsCountStatistics";
private final Map<OpType, AtomicLong> opsCount = new EnumMap<>(OpType.class); private final Map<OpType, LongAdder> opsCount = new EnumMap<>(OpType.class);
public DFSOpsCountStatistics() { public DFSOpsCountStatistics() {
super(NAME); super(NAME);
for (OpType opType : OpType.values()) { for (OpType opType : OpType.values()) {
opsCount.put(opType, new AtomicLong(0)); opsCount.put(opType, new LongAdder());
} }
} }
public void incrementOpCounter(OpType op) { public void incrementOpCounter(OpType op) {
opsCount.get(op).addAndGet(1); opsCount.get(op).increment();
} }
private class LongIterator implements Iterator<LongStatistic> { private class LongIterator implements Iterator<LongStatistic> {
private Iterator<Entry<OpType, AtomicLong>> iterator = private final Iterator<Entry<OpType, LongAdder>> iterator =
opsCount.entrySet().iterator(); opsCount.entrySet().iterator();
@Override @Override
@ -167,9 +167,9 @@ public class DFSOpsCountStatistics extends StorageStatistics {
if (!iterator.hasNext()) { if (!iterator.hasNext()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
final Entry<OpType, AtomicLong> entry = iterator.next(); final Entry<OpType, LongAdder> entry = iterator.next();
return new LongStatistic(entry.getKey().getSymbol(), return new LongStatistic(entry.getKey().getSymbol(),
entry.getValue().get()); entry.getValue().longValue());
} }
@Override @Override
@ -191,7 +191,7 @@ public class DFSOpsCountStatistics extends StorageStatistics {
@Override @Override
public Long getLong(String key) { public Long getLong(String key) {
final OpType type = OpType.fromSymbol(key); final OpType type = OpType.fromSymbol(key);
return type == null ? null : opsCount.get(type).get(); return type == null ? null : opsCount.get(type).longValue();
} }
@Override @Override
@ -201,8 +201,8 @@ public class DFSOpsCountStatistics extends StorageStatistics {
@Override @Override
public void reset() { public void reset() {
for (AtomicLong count : opsCount.values()) { for (LongAdder count : opsCount.values()) {
count.set(0); count.reset();
} }
} }

View File

@ -42,7 +42,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.time.DurationFormatUtils;
@ -120,7 +120,7 @@ public class FsDatasetCache {
private final HashMap<ExtendedBlockId, Value> mappableBlockMap = private final HashMap<ExtendedBlockId, Value> mappableBlockMap =
new HashMap<ExtendedBlockId, Value>(); new HashMap<ExtendedBlockId, Value>();
private final AtomicLong numBlocksCached = new AtomicLong(0); private final LongAdder numBlocksCached = new LongAdder();
private final FsDatasetImpl dataset; private final FsDatasetImpl dataset;
@ -143,11 +143,11 @@ public class FsDatasetCache {
/** /**
* Number of cache commands that could not be completed successfully * Number of cache commands that could not be completed successfully
*/ */
final AtomicLong numBlocksFailedToCache = new AtomicLong(0); final LongAdder numBlocksFailedToCache = new LongAdder();
/** /**
* Number of uncache commands that could not be completed successfully * Number of uncache commands that could not be completed successfully
*/ */
final AtomicLong numBlocksFailedToUncache = new AtomicLong(0); final LongAdder numBlocksFailedToUncache = new LongAdder();
public FsDatasetCache(FsDatasetImpl dataset) throws IOException { public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
this.dataset = dataset; this.dataset = dataset;
@ -204,7 +204,7 @@ public class FsDatasetCache {
for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) { for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) {
mappableBlockMap.put(entry.getKey(), mappableBlockMap.put(entry.getKey(),
new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED)); new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED));
numBlocksCached.addAndGet(1); numBlocksCached.increment();
dataset.datanode.getMetrics().incrBlocksCached(1); dataset.datanode.getMetrics().incrBlocksCached(1);
} }
} }
@ -278,7 +278,7 @@ public class FsDatasetCache {
LOG.debug("Block with id {}, pool {} already exists in the " LOG.debug("Block with id {}, pool {} already exists in the "
+ "FsDatasetCache with state {}", blockId, bpid, prevValue.state + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
); );
numBlocksFailedToCache.incrementAndGet(); numBlocksFailedToCache.increment();
return; return;
} }
mappableBlockMap.put(key, new Value(null, State.CACHING)); mappableBlockMap.put(key, new Value(null, State.CACHING));
@ -301,7 +301,7 @@ public class FsDatasetCache {
LOG.debug("Block with id {}, pool {} does not need to be uncached, " LOG.debug("Block with id {}, pool {} does not need to be uncached, "
+ "because it is not currently in the mappableBlockMap.", blockId, + "because it is not currently in the mappableBlockMap.", blockId,
bpid); bpid);
numBlocksFailedToUncache.incrementAndGet(); numBlocksFailedToUncache.increment();
return; return;
} }
switch (prevValue.state) { switch (prevValue.state) {
@ -331,7 +331,7 @@ public class FsDatasetCache {
default: default:
LOG.debug("Block with id {}, pool {} does not need to be uncached, " LOG.debug("Block with id {}, pool {} does not need to be uncached, "
+ "because it is in state {}.", blockId, bpid, prevValue.state); + "because it is in state {}.", blockId, bpid, prevValue.state);
numBlocksFailedToUncache.incrementAndGet(); numBlocksFailedToUncache.increment();
break; break;
} }
} }
@ -469,7 +469,7 @@ public class FsDatasetCache {
dataset.datanode. dataset.datanode.
getShortCircuitRegistry().processBlockMlockEvent(key); getShortCircuitRegistry().processBlockMlockEvent(key);
} }
numBlocksCached.addAndGet(1); numBlocksCached.increment();
dataset.datanode.getMetrics().incrBlocksCached(1); dataset.datanode.getMetrics().incrBlocksCached(1);
success = true; success = true;
} finally { } finally {
@ -482,7 +482,7 @@ public class FsDatasetCache {
LOG.debug("Caching of {} was aborted. We are now caching only {} " LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, cacheLoader.getCacheUsed()); + "bytes in total.", key, cacheLoader.getCacheUsed());
IOUtils.closeQuietly(mappableBlock); IOUtils.closeQuietly(mappableBlock);
numBlocksFailedToCache.incrementAndGet(); numBlocksFailedToCache.increment();
synchronized (FsDatasetCache.this) { synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key); mappableBlockMap.remove(key);
@ -561,7 +561,7 @@ public class FsDatasetCache {
} }
long newUsedBytes = cacheLoader. long newUsedBytes = cacheLoader.
release(key, value.mappableBlock.getLength()); release(key, value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1); numBlocksCached.decrement();
dataset.datanode.getMetrics().incrBlocksUncached(1); dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) { if (revocationTimeMs != 0) {
LOG.debug("Uncaching of {} completed. usedBytes = {}", LOG.debug("Uncaching of {} completed. usedBytes = {}",
@ -607,15 +607,15 @@ public class FsDatasetCache {
} }
public long getNumBlocksFailedToCache() { public long getNumBlocksFailedToCache() {
return numBlocksFailedToCache.get(); return numBlocksFailedToCache.longValue();
} }
public long getNumBlocksFailedToUncache() { public long getNumBlocksFailedToUncache() {
return numBlocksFailedToUncache.get(); return numBlocksFailedToUncache.longValue();
} }
public long getNumBlocksCached() { public long getNumBlocksCached() {
return numBlocksCached.get(); return numBlocksCached.longValue();
} }
public synchronized boolean isCached(String bpid, long blockId) { public synchronized boolean isCached(String bpid, long blockId) {

View File

@ -2264,7 +2264,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
cacheManager.numBlocksFailedToCache.incrementAndGet(); cacheManager.numBlocksFailedToCache.increment();
} }
} }
blockFileName = info.getBlockURI().toString(); blockFileName = info.getBlockURI().toString();

View File

@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -182,7 +182,7 @@ public class FSEditLog implements LogsPurgeable {
// these are statistics counters. // these are statistics counters.
private long numTransactions; // number of transactions private long numTransactions; // number of transactions
private final AtomicLong numTransactionsBatchedInSync = new AtomicLong(); private final LongAdder numTransactionsBatchedInSync = new LongAdder();
private long totalTimeTransactions; // total time for all transactions private long totalTimeTransactions; // total time for all transactions
private NameNodeMetrics metrics; private NameNodeMetrics metrics;
@ -730,7 +730,7 @@ public class FSEditLog implements LogsPurgeable {
if (metrics != null) { // Metrics non-null only when used inside name node if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed); metrics.addSync(elapsed);
metrics.incrTransactionsBatchedInSync(editsBatchedInSync); metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
numTransactionsBatchedInSync.addAndGet(editsBatchedInSync); numTransactionsBatchedInSync.add(editsBatchedInSync);
} }
} finally { } finally {
@ -770,7 +770,7 @@ public class FSEditLog implements LogsPurgeable {
.append(" Total time for transactions(ms): ") .append(" Total time for transactions(ms): ")
.append(totalTimeTransactions) .append(totalTimeTransactions)
.append(" Number of transactions batched in Syncs: ") .append(" Number of transactions batched in Syncs: ")
.append(numTransactionsBatchedInSync.get()) .append(numTransactionsBatchedInSync.longValue())
.append(" Number of syncs: ") .append(" Number of syncs: ")
.append(editLogStream.getNumSync()) .append(editLogStream.getNumSync())
.append(" SyncTimes(ms): ") .append(" SyncTimes(ms): ")
@ -1402,7 +1402,7 @@ public class FSEditLog implements LogsPurgeable {
numTransactions = 0; numTransactions = 0;
totalTimeTransactions = 0; totalTimeTransactions = 0;
numTransactionsBatchedInSync.set(0L); numTransactionsBatchedInSync.reset();
// TODO no need to link this back to storage anymore! // TODO no need to link this back to storage anymore!
// See HDFS-2174. // See HDFS-2174.

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -112,12 +113,12 @@ class FSNamesystemLock {
* The number of time the read lock * The number of time the read lock
* has been held longer than the threshold. * has been held longer than the threshold.
*/ */
private final AtomicLong numReadLockLongHold = new AtomicLong(0); private final LongAdder numReadLockLongHold = new LongAdder();
/** /**
* The number of time the write lock * The number of time the write lock
* has been held for longer than the threshold. * has been held for longer than the threshold.
*/ */
private final AtomicLong numWriteLockLongHold = new AtomicLong(0); private final LongAdder numWriteLockLongHold = new LongAdder();
@VisibleForTesting @VisibleForTesting
static final String OP_NAME_OTHER = "OTHER"; static final String OP_NAME_OTHER = "OTHER";
@ -186,7 +187,7 @@ class FSNamesystemLock {
final long readLockIntervalMs = final long readLockIntervalMs =
TimeUnit.NANOSECONDS.toMillis(readLockIntervalNanos); TimeUnit.NANOSECONDS.toMillis(readLockIntervalNanos);
if (needReport && readLockIntervalMs >= this.readLockReportingThresholdMs) { if (needReport && readLockIntervalMs >= this.readLockReportingThresholdMs) {
numReadLockLongHold.incrementAndGet(); numReadLockLongHold.increment();
LockHeldInfo localLockHeldInfo; LockHeldInfo localLockHeldInfo;
do { do {
localLockHeldInfo = longestReadLockHeldInfo.get(); localLockHeldInfo = longestReadLockHeldInfo.get();
@ -264,7 +265,7 @@ class FSNamesystemLock {
LogAction logAction = LogThrottlingHelper.DO_NOT_LOG; LogAction logAction = LogThrottlingHelper.DO_NOT_LOG;
if (needReport && if (needReport &&
writeLockIntervalMs >= this.writeLockReportingThresholdMs) { writeLockIntervalMs >= this.writeLockReportingThresholdMs) {
numWriteLockLongHold.incrementAndGet(); numWriteLockLongHold.increment();
if (longestWriteLockHeldInfo.getIntervalMs() < writeLockIntervalMs) { if (longestWriteLockHeldInfo.getIntervalMs() < writeLockIntervalMs) {
longestWriteLockHeldInfo = longestWriteLockHeldInfo =
new LockHeldInfo(currentTimeMs, writeLockIntervalMs, new LockHeldInfo(currentTimeMs, writeLockIntervalMs,
@ -322,7 +323,7 @@ class FSNamesystemLock {
* has been held longer than the threshold * has been held longer than the threshold
*/ */
public long getNumOfReadLockLongHold() { public long getNumOfReadLockLongHold() {
return numReadLockLongHold.get(); return numReadLockLongHold.longValue();
} }
/** /**
@ -333,7 +334,7 @@ class FSNamesystemLock {
* has been held longer than the threshold. * has been held longer than the threshold.
*/ */
public long getNumOfWriteLockLongHold() { public long getNumOfWriteLockLongHold() {
return numWriteLockLongHold.get(); return numWriteLockLongHold.longValue();
} }
/** /**

View File

@ -401,9 +401,9 @@ public class TestPread {
DFSClient dfsClient = fileSys.getClient(); DFSClient dfsClient = fileSys.getClient();
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
// Metrics instance is static, so we need to reset counts from prior tests. // Metrics instance is static, so we need to reset counts from prior tests.
metrics.hedgedReadOps.set(0); metrics.hedgedReadOps.reset();
metrics.hedgedReadOpsWin.set(0); metrics.hedgedReadOpsWin.reset();
metrics.hedgedReadOpsInCurThread.set(0); metrics.hedgedReadOpsInCurThread.reset();
try { try {
Path file1 = new Path("hedgedReadMaxOut.dat"); Path file1 = new Path("hedgedReadMaxOut.dat");
@ -590,7 +590,7 @@ public class TestPread {
String filename = "/hedgedReadMaxOut.dat"; String filename = "/hedgedReadMaxOut.dat";
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
// Metrics instance is static, so we need to reset counts from prior tests. // Metrics instance is static, so we need to reset counts from prior tests.
metrics.hedgedReadOps.set(0); metrics.hedgedReadOps.reset();
try { try {
Path file = new Path(filename); Path file = new Path(filename);
output = fileSys.create(file, (short) 2); output = fileSys.create(file, (short) 2);