diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java index 2c1f591a1f..cc0ebdf8b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java @@ -55,8 +55,10 @@ public class InstrumentedLock implements Lock { // Tracking counters for lock statistics. private volatile long lockAcquireTimestamp; - private final AtomicLong lastLogTimestamp; - private final AtomicLong warningsSuppressed = new AtomicLong(0); + private final AtomicLong lastHoldLogTimestamp; + private final AtomicLong lastWaitLogTimestamp; + private final SuppressedStats holdStats = new SuppressedStats(); + private final SuppressedStats waitStats = new SuppressedStats(); /** * Create a instrumented lock instance which logs a warning message @@ -91,19 +93,24 @@ public InstrumentedLock(String name, Logger logger, Lock lock, this.logger = logger; minLoggingGap = minLoggingGapMs; lockWarningThreshold = lockWarningThresholdMs; - lastLogTimestamp = new AtomicLong( + lastHoldLogTimestamp = new AtomicLong( clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold)); + lastWaitLogTimestamp = new AtomicLong(lastHoldLogTimestamp.get()); } @Override public void lock() { + long waitStart = clock.monotonicNow(); lock.lock(); + check(waitStart, clock.monotonicNow(), false); startLockTiming(); } @Override public void lockInterruptibly() throws InterruptedException { + long waitStart = clock.monotonicNow(); lock.lockInterruptibly(); + check(waitStart, clock.monotonicNow(), false); startLockTiming(); } @@ -118,11 +125,14 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + long waitStart = clock.monotonicNow(); + boolean retval = false; if (lock.tryLock(time, unit)) { startLockTiming(); - return true; + retval = true; } - return false; + check(waitStart, clock.monotonicNow(), false); + return retval; } @Override @@ -130,7 +140,7 @@ public void unlock() { long localLockReleaseTime = clock.monotonicNow(); long localLockAcquireTime = lockAcquireTimestamp; lock.unlock(); - check(localLockAcquireTime, localLockReleaseTime); + check(localLockAcquireTime, localLockReleaseTime, true); } @Override @@ -139,12 +149,25 @@ public Condition newCondition() { } @VisibleForTesting - void logWarning(long lockHeldTime, long suppressed) { + void logWarning(long lockHeldTime, SuppressedSnapshot stats) { logger.warn(String.format("Lock held time above threshold: " + "lock identifier: %s " + "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " + + "Longest suppressed LockHeldTimeMs=%d. " + "The stack trace is: %s" , - name, lockHeldTime, suppressed, + name, lockHeldTime, stats.getSuppressedCount(), + stats.getMaxSuppressedWait(), + StringUtils.getStackTrace(Thread.currentThread()))); + } + + @VisibleForTesting + void logWaitWarning(long lockWaitTime, SuppressedSnapshot stats) { + logger.warn(String.format("Waited above threshold to acquire lock: " + + "lock identifier: %s " + + "waitTimeMs=%d ms. Suppressed %d lock wait warnings. " + + "Longest suppressed WaitTimeMs=%d. " + + "The stack trace is: %s", name, lockWaitTime, + stats.getSuppressedCount(), stats.getMaxSuppressedWait(), StringUtils.getStackTrace(Thread.currentThread()))); } @@ -163,27 +186,41 @@ protected void startLockTiming() { * @param acquireTime - timestamp just after acquiring the lock. * @param releaseTime - timestamp just before releasing the lock. */ - protected void check(long acquireTime, long releaseTime) { + protected void check(long acquireTime, long releaseTime, + boolean checkLockHeld) { if (!logger.isWarnEnabled()) { return; } final long lockHeldTime = releaseTime - acquireTime; if (lockWarningThreshold - lockHeldTime < 0) { + AtomicLong lastLogTime; + SuppressedStats stats; + if (checkLockHeld) { + lastLogTime = lastHoldLogTimestamp; + stats = holdStats; + } else { + lastLogTime = lastWaitLogTimestamp; + stats = waitStats; + } long now; long localLastLogTs; do { now = clock.monotonicNow(); - localLastLogTs = lastLogTimestamp.get(); + localLastLogTs = lastLogTime.get(); long deltaSinceLastLog = now - localLastLogTs; // check should print log or not if (deltaSinceLastLog - minLoggingGap < 0) { - warningsSuppressed.incrementAndGet(); + stats.incrementSuppressed(lockHeldTime); return; } - } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now)); - long suppressed = warningsSuppressed.getAndSet(0); - logWarning(lockHeldTime, suppressed); + } while (!lastLogTime.compareAndSet(localLastLogTs, now)); + SuppressedSnapshot statsSnapshot = stats.snapshot(); + if (checkLockHeld) { + logWarning(lockHeldTime, statsSnapshot); + } else { + logWaitWarning(lockHeldTime, statsSnapshot); + } } } @@ -194,4 +231,60 @@ protected Lock getLock() { protected Timer getTimer() { return clock; } + + /** + * Internal class to track statistics about suppressed log messages in an + * atomic way. + */ + private static class SuppressedStats { + private long suppressedCount = 0; + private long maxSuppressedWait = 0; + + /** + * Increments the suppressed counter and increases the max wait time if the + * passed wait is greater than the current maxSuppressedWait. + * @param wait The wait time for this suppressed message + */ + synchronized public void incrementSuppressed(long wait) { + suppressedCount++; + if (wait > maxSuppressedWait) { + maxSuppressedWait = wait; + } + } + + /** + * Captures the current value of the counts into a SuppressedSnapshot object + * and resets the values to zero. + * + * @return SuppressedSnapshot containing the current value of the counters + */ + synchronized public SuppressedSnapshot snapshot() { + SuppressedSnapshot snap = + new SuppressedSnapshot(suppressedCount, maxSuppressedWait); + suppressedCount = 0; + maxSuppressedWait = 0; + return snap; + } + } + + /** + * Immutable class to capture a snapshot of suppressed log message stats. + */ + protected static class SuppressedSnapshot { + private long suppressedCount = 0; + private long maxSuppressedWait = 0; + + public SuppressedSnapshot(long suppressedCount, long maxWait) { + this.suppressedCount = suppressedCount; + this.maxSuppressedWait = maxWait; + } + + public long getMaxSuppressedWait() { + return maxSuppressedWait; + } + + public long getSuppressedCount() { + return suppressedCount; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java index e115718478..8ab392ed04 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java @@ -75,7 +75,7 @@ public void unlock() { getLock().unlock(); if (needReport) { readLockHeldTimeStamp.remove(); - check(localLockAcquireTime, localLockReleaseTime); + check(localLockAcquireTime, localLockReleaseTime, true); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java index 44158ec5b0..c47ff0712d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.util; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,12 +119,14 @@ public long monotonicNow() { final AtomicLong wlogged = new AtomicLong(0); final AtomicLong wsuppresed = new AtomicLong(0); + final AtomicLong wMaxWait = new AtomicLong(0); InstrumentedLock lock = new InstrumentedLock( testname, LOG, mlock, 2000, 300, mclock) { @Override - void logWarning(long lockHeldTime, long suppressed) { + void logWarning(long lockHeldTime, SuppressedSnapshot stats) { wlogged.incrementAndGet(); - wsuppresed.set(suppressed); + wsuppresed.set(stats.getSuppressedCount()); + wMaxWait.set(stats.getMaxSuppressedWait()); } }; @@ -132,12 +136,14 @@ void logWarning(long lockHeldTime, long suppressed) { lock.unlock(); // t = 200 assertEquals(0, wlogged.get()); assertEquals(0, wsuppresed.get()); + assertEquals(0, wMaxWait.get()); lock.lock(); // t = 200 time.set(700); lock.unlock(); // t = 700 assertEquals(1, wlogged.get()); assertEquals(0, wsuppresed.get()); + assertEquals(0, wMaxWait.get()); // despite the lock held time is greater than threshold // suppress the log warning due to the logging gap @@ -147,6 +153,7 @@ void logWarning(long lockHeldTime, long suppressed) { lock.unlock(); // t = 1100 assertEquals(1, wlogged.get()); assertEquals(0, wsuppresed.get()); + assertEquals(0, wMaxWait.get()); // log a warning message when the lock held time is greater the threshold // and the logging time gap is satisfied. Also should display suppressed @@ -157,6 +164,106 @@ void logWarning(long lockHeldTime, long suppressed) { lock.unlock(); // t = 2800 assertEquals(2, wlogged.get()); assertEquals(1, wsuppresed.get()); + assertEquals(400, wMaxWait.get()); + } + + /** + * Test the lock logs warning when lock wait / queue time is greater than + * threshold and not log warning otherwise. + * @throws Exception + */ + @Test(timeout=10000) + public void testLockLongWaitReport() throws Exception { + String testname = name.getMethodName(); + final AtomicLong time = new AtomicLong(0); + Timer mclock = new Timer() { + @Override + public long monotonicNow() { + return time.get(); + } + }; + Lock mlock = new ReentrantLock(true); //mock(Lock.class); + + final AtomicLong wlogged = new AtomicLong(0); + final AtomicLong wsuppresed = new AtomicLong(0); + final AtomicLong wMaxWait = new AtomicLong(0); + InstrumentedLock lock = new InstrumentedLock( + testname, LOG, mlock, 2000, 300, mclock) { + @Override + void logWaitWarning(long lockHeldTime, SuppressedSnapshot stats) { + wlogged.incrementAndGet(); + wsuppresed.set(stats.getSuppressedCount()); + wMaxWait.set(stats.getMaxSuppressedWait()); + } + }; + + // do not log warning when the lock held time is short + lock.lock(); // t = 0 + + Thread competingThread = lockUnlockThread(lock); + time.set(200); + lock.unlock(); // t = 200 + competingThread.join(); + assertEquals(0, wlogged.get()); + assertEquals(0, wsuppresed.get()); + assertEquals(0, wMaxWait.get()); + + + lock.lock(); // t = 200 + competingThread = lockUnlockThread(lock); + time.set(700); + lock.unlock(); // t = 700 + competingThread.join(); + + // The competing thread will have waited for 500ms, so it should log + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + assertEquals(0, wMaxWait.get()); + + // despite the lock wait time is greater than threshold + // suppress the log warning due to the logging gap + // (not recorded in wsuppressed until next log message) + lock.lock(); // t = 700 + competingThread = lockUnlockThread(lock); + time.set(1100); + lock.unlock(); // t = 1100 + competingThread.join(); + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + assertEquals(0, wMaxWait.get()); + + // log a warning message when the lock held time is greater the threshold + // and the logging time gap is satisfied. Also should display suppressed + // previous warnings. + time.set(2400); + lock.lock(); // t = 2400 + competingThread = lockUnlockThread(lock); + time.set(2800); + lock.unlock(); // t = 2800 + competingThread.join(); + assertEquals(2, wlogged.get()); + assertEquals(1, wsuppresed.get()); + assertEquals(400, wMaxWait.get()); + } + + private Thread lockUnlockThread(Lock lock) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + assertFalse(lock.tryLock()); + countDownLatch.countDown(); + lock.lock(); + } finally { + lock.unlock(); + } + }); + t.start(); + countDownLatch.await(); + // Even with the countdown latch, the main thread releases the lock + // before this thread actually starts waiting on it, so introducing a + // short sleep so the competing thread can block on the lock as intended. + Thread.sleep(3); + return t; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java index 3e1a88bd0a..1ea3ef1860 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java @@ -146,9 +146,10 @@ public long monotonicNow() { InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG, readWriteLock, 2000, 300, mclock) { @Override - protected void logWarning(long lockHeldTime, long suppressed) { + protected void logWarning( + long lockHeldTime, SuppressedSnapshot stats) { wlogged.incrementAndGet(); - wsuppresed.set(suppressed); + wsuppresed.set(stats.getSuppressedCount()); } }; @@ -200,9 +201,9 @@ public long monotonicNow() { InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG, readWriteLock, 2000, 300, mclock) { @Override - protected void logWarning(long lockHeldTime, long suppressed) { + protected void logWarning(long lockHeldTime, SuppressedSnapshot stats) { wlogged.incrementAndGet(); - wsuppresed.set(suppressed); + wsuppresed.set(stats.getSuppressedCount()); } };