From 011f3b24d4bfda505a90ab5b5576916a41f869c5 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Thu, 8 Sep 2016 17:52:02 -0700 Subject: [PATCH] HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang --- .../apache/hadoop/util/AutoCloseableLock.java | 28 ++- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + .../apache/hadoop/hdfs/InstrumentedLock.java | 185 ++++++++++++++++++ .../fsdataset/impl/FsDatasetImpl.java | 10 +- .../src/main/resources/hdfs-default.xml | 8 + .../hadoop/hdfs/TestInstrumentedLock.java | 166 ++++++++++++++++ 6 files changed, 395 insertions(+), 7 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java index 2aa85782e3..d920bc63c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java @@ -17,22 +17,33 @@ */ package org.apache.hadoop.util; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; + /** * This is a wrap class of a ReentrantLock. Extending AutoCloseable * interface such that the users can use a try-with-resource syntax. */ public class AutoCloseableLock implements AutoCloseable { - private final ReentrantLock lock; + private final Lock lock; /** * Creates an instance of {@code AutoCloseableLock}, initializes - * the underlying {@code ReentrantLock} object. + * the underlying lock instance with a new {@code ReentrantLock}. */ public AutoCloseableLock() { - this.lock = new ReentrantLock(); + this(new ReentrantLock()); + } + + /** + * Wrap provided Lock instance. + * @param lock Lock instance to wrap in AutoCloseable API. + */ + public AutoCloseableLock(Lock lock) { + this.lock = lock; } /** @@ -86,7 +97,7 @@ public void close() { /** * A wrapper method that makes a call to {@code tryLock()} of - * the underlying {@code ReentrantLock} object. + * the underlying {@code Lock} object. * * If the lock is not held by another thread, acquires the lock, set the * hold count to one and returns {@code true}. @@ -116,7 +127,12 @@ public boolean tryLock() { * @return {@code true} if any thread holds this lock and * {@code false} otherwise */ - public boolean isLocked() { - return lock.isLocked(); + @VisibleForTesting + boolean isLocked() { + if (lock instanceof ReentrantLock) { + return ((ReentrantLock)lock).isLocked(); + } + throw new UnsupportedOperationException(); } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7bdfd440d5..caf6b6078d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -419,6 +419,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY = "dfs.namenode.read-lock-reporting-threshold-ms"; public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L; + // Threshold for how long the lock warnings must be suppressed + public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY = + "dfs.lock.suppress.warning.interval"; + public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT = + 10000; //ms public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java new file mode 100644 index 0000000000..6279e95522 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.commons.logging.Log; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Timer; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This is a debugging class that can be used by callers to track + * whether a specifc lock is being held for too long and periodically + * log a warning and stack trace, if so. + * + * The logged warnings are throttled so that logs are not spammed. + * + * A new instance of InstrumentedLock can be created for each object + * that needs to be instrumented. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class InstrumentedLock implements Lock { + + private final Lock lock; + private final Log logger; + private final String name; + private final Timer clock; + + /** Minimum gap between two lock warnings. */ + private final long minLoggingGap; + /** Threshold for detecting long lock held time. */ + private final long lockWarningThreshold; + + // Tracking counters for lock statistics. + private volatile long lockAcquireTimestamp; + private final AtomicLong lastLogTimestamp; + private final AtomicLong warningsSuppressed = new AtomicLong(0); + + /** + * Create a instrumented lock instance which logs a warning message + * when lock held time is above given threshold. + * + * @param name the identifier of the lock object + * @param logger this class does not have its own logger, will log to the + * given logger instead + * @param minLoggingGapMs the minimum time gap between two log messages, + * this is to avoid spamming to many logs + * @param lockWarningThresholdMs the time threshold to view lock held + * time as being "too long" + */ + public InstrumentedLock(String name, Log logger, long minLoggingGapMs, + long lockWarningThresholdMs) { + this(name, logger, new ReentrantLock(), + minLoggingGapMs, lockWarningThresholdMs); + } + + public InstrumentedLock(String name, Log logger, Lock lock, + long minLoggingGapMs, long lockWarningThresholdMs) { + this(name, logger, lock, + minLoggingGapMs, lockWarningThresholdMs, new Timer()); + } + + @VisibleForTesting + InstrumentedLock(String name, Log logger, Lock lock, + long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { + this.name = name; + this.lock = lock; + this.clock = clock; + this.logger = logger; + minLoggingGap = minLoggingGapMs; + lockWarningThreshold = lockWarningThresholdMs; + lastLogTimestamp = new AtomicLong( + clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold)); + } + + @Override + public void lock() { + lock.lock(); + lockAcquireTimestamp = clock.monotonicNow(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + lock.lockInterruptibly(); + lockAcquireTimestamp = clock.monotonicNow(); + } + + @Override + public boolean tryLock() { + if (lock.tryLock()) { + lockAcquireTimestamp = clock.monotonicNow(); + return true; + } + return false; + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + if (lock.tryLock(time, unit)) { + lockAcquireTimestamp = clock.monotonicNow(); + return true; + } + return false; + } + + @Override + public void unlock() { + long localLockReleaseTime = clock.monotonicNow(); + long localLockAcquireTime = lockAcquireTimestamp; + lock.unlock(); + check(localLockAcquireTime, localLockReleaseTime); + } + + @Override + public Condition newCondition() { + return lock.newCondition(); + } + + @VisibleForTesting + void logWarning(long lockHeldTime, long suppressed) { + logger.warn(String.format("Lock held time above threshold: " + + "lock identifier: %s " + + "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " + + "The stack trace is: %s" , + name, lockHeldTime, suppressed, + StringUtils.getStackTrace(Thread.currentThread()))); + } + + /** + * Log a warning if the lock was held for too long. + * + * Should be invoked by the caller immediately AFTER releasing the lock. + * + * @param acquireTime - timestamp just after acquiring the lock. + * @param releaseTime - timestamp just before releasing the lock. + */ + private void check(long acquireTime, long releaseTime) { + if (!logger.isWarnEnabled()) { + return; + } + + final long lockHeldTime = releaseTime - acquireTime; + if (lockWarningThreshold - lockHeldTime < 0) { + long now; + long localLastLogTs; + do { + now = clock.monotonicNow(); + localLastLogTs = lastLogTimestamp.get(); + long deltaSinceLastLog = now - localLastLogTs; + // check should print log or not + if (deltaSinceLastLog - minLoggingGap < 0) { + warningsSuppressed.incrementAndGet(); + return; + } + } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now)); + long suppressed = warningsSuppressed.getAndSet(0); + logWarning(lockHeldTime, suppressed); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index ffd2f8ab34..e5da0e5b1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.InstrumentedLock; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -278,7 +280,13 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); - this.datasetLock = new AutoCloseableLock(); + this.datasetLock = new AutoCloseableLock( + new InstrumentedLock(getClass().getName(), LOG, + conf.getTimeDuration( + DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + 300)); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 3f78233e62..3a5de3e46c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4273,4 +4273,12 @@ a plan. + + + dfs.lock.suppress.warning.interval + 10s + Instrumentation reporting long critical sections will suppress + consecutive warnings within this interval. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java new file mode 100644 index 0000000000..f470688a18 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; + +import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.util.Timer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +/** + * A test class for InstrumentedLock. + */ +public class TestInstrumentedLock { + + static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class); + + @Rule public TestName name = new TestName(); + + /** + * Test exclusive access of the lock. + * @throws Exception + */ + @Test(timeout=10000) + public void testMultipleThread() throws Exception { + String testname = name.getMethodName(); + InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300); + lock.lock(); + try { + Thread competingThread = new Thread() { + @Override + public void run() { + assertFalse(lock.tryLock()); + } + }; + competingThread.start(); + competingThread.join(); + } finally { + lock.unlock(); + } + } + + /** + * Test the correctness with try-with-resource syntax. + * @throws Exception + */ + @Test(timeout=10000) + public void testTryWithResourceSyntax() throws Exception { + String testname = name.getMethodName(); + final AtomicReference lockThread = new AtomicReference<>(null); + Lock lock = new InstrumentedLock(testname, LOG, 0, 300) { + @Override + public void lock() { + super.lock(); + lockThread.set(Thread.currentThread()); + } + @Override + public void unlock() { + super.unlock(); + lockThread.set(null); + } + }; + AutoCloseableLock acl = new AutoCloseableLock(lock); + try (AutoCloseable localLock = acl.acquire()) { + assertEquals(acl, localLock); + Thread competingThread = new Thread() { + @Override + public void run() { + assertNotEquals(Thread.currentThread(), lockThread.get()); + assertFalse(lock.tryLock()); + } + }; + competingThread.start(); + competingThread.join(); + assertEquals(Thread.currentThread(), lockThread.get()); + } + assertNull(lockThread.get()); + } + + /** + * Test the lock logs warning when lock held time is greater than threshold + * and not log warning otherwise. + * @throws Exception + */ + @Test(timeout=10000) + public void testLockLongHoldingReport() throws Exception { + String testname = name.getMethodName(); + final AtomicLong time = new AtomicLong(0); + Timer mclock = new Timer() { + @Override + public long monotonicNow() { + return time.get(); + } + }; + Lock mlock = mock(Lock.class); + + final AtomicLong wlogged = new AtomicLong(0); + final AtomicLong wsuppresed = new AtomicLong(0); + InstrumentedLock lock = new InstrumentedLock( + testname, LOG, mlock, 2000, 300, mclock) { + @Override + void logWarning(long lockHeldTime, long suppressed) { + wlogged.incrementAndGet(); + wsuppresed.set(suppressed); + } + }; + + // do not log warning when the lock held time is short + lock.lock(); // t = 0 + time.set(200); + lock.unlock(); // t = 200 + assertEquals(0, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + lock.lock(); // t = 200 + time.set(700); + lock.unlock(); // t = 700 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + // despite the lock held time is greater than threshold + // suppress the log warning due to the logging gap + // (not recorded in wsuppressed until next log message) + lock.lock(); // t = 700 + time.set(1100); + lock.unlock(); // t = 1100 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + // log a warning message when the lock held time is greater the threshold + // and the logging time gap is satisfied. Also should display suppressed + // previous warnings. + time.set(2400); + lock.lock(); // t = 2400 + time.set(2800); + lock.unlock(); // t = 2800 + assertEquals(2, wlogged.get()); + assertEquals(1, wsuppresed.get()); + } + +}