diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java index 2aa85782e3..d920bc63c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java @@ -17,22 +17,33 @@ */ package org.apache.hadoop.util; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; + /** * This is a wrap class of a ReentrantLock. Extending AutoCloseable * interface such that the users can use a try-with-resource syntax. */ public class AutoCloseableLock implements AutoCloseable { - private final ReentrantLock lock; + private final Lock lock; /** * Creates an instance of {@code AutoCloseableLock}, initializes - * the underlying {@code ReentrantLock} object. + * the underlying lock instance with a new {@code ReentrantLock}. */ public AutoCloseableLock() { - this.lock = new ReentrantLock(); + this(new ReentrantLock()); + } + + /** + * Wrap provided Lock instance. + * @param lock Lock instance to wrap in AutoCloseable API. + */ + public AutoCloseableLock(Lock lock) { + this.lock = lock; } /** @@ -86,7 +97,7 @@ public void close() { /** * A wrapper method that makes a call to {@code tryLock()} of - * the underlying {@code ReentrantLock} object. + * the underlying {@code Lock} object. * * If the lock is not held by another thread, acquires the lock, set the * hold count to one and returns {@code true}. @@ -116,7 +127,12 @@ public boolean tryLock() { * @return {@code true} if any thread holds this lock and * {@code false} otherwise */ - public boolean isLocked() { - return lock.isLocked(); + @VisibleForTesting + boolean isLocked() { + if (lock instanceof ReentrantLock) { + return ((ReentrantLock)lock).isLocked(); + } + throw new UnsupportedOperationException(); } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7bdfd440d5..caf6b6078d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -419,6 +419,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY = "dfs.namenode.read-lock-reporting-threshold-ms"; public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L; + // Threshold for how long the lock warnings must be suppressed + public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY = + "dfs.lock.suppress.warning.interval"; + public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT = + 10000; //ms public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java new file mode 100644 index 0000000000..6279e95522 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+ private final Lock lock;
+ private final Log logger;
+ private final String name;
+ private final Timer clock;
+
+ /** Minimum gap between two lock warnings. */
+ private final long minLoggingGap;
+ /** Threshold for detecting long lock held time. */
+ private final long lockWarningThreshold;
+
+ // Tracking counters for lock statistics.
+ private volatile long lockAcquireTimestamp;
+ private final AtomicLong lastLogTimestamp;
+ private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+ /**
+ * Create a instrumented lock instance which logs a warning message
+ * when lock held time is above given threshold.
+ *
+ * @param name the identifier of the lock object
+ * @param logger this class does not have its own logger, will log to the
+ * given logger instead
+ * @param minLoggingGapMs the minimum time gap between two log messages,
+ * this is to avoid spamming to many logs
+ * @param lockWarningThresholdMs the time threshold to view lock held
+ * time as being "too long"
+ */
+ public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+ long lockWarningThresholdMs) {
+ this(name, logger, new ReentrantLock(),
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ public InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, lock,
+ minLoggingGapMs, lockWarningThresholdMs, new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ this.name = name;
+ this.lock = lock;
+ this.clock = clock;
+ this.logger = logger;
+ minLoggingGap = minLoggingGapMs;
+ lockWarningThreshold = lockWarningThresholdMs;
+ lastLogTimestamp = new AtomicLong(
+ clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ long localLockReleaseTime = clock.monotonicNow();
+ long localLockAcquireTime = lockAcquireTimestamp;
+ lock.unlock();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ @VisibleForTesting
+ void logWarning(long lockHeldTime, long suppressed) {
+ logger.warn(String.format("Lock held time above threshold: " +
+ "lock identifier: %s " +
+ "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "The stack trace is: %s" ,
+ name, lockHeldTime, suppressed,
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ /**
+ * Log a warning if the lock was held for too long.
+ *
+ * Should be invoked by the caller immediately AFTER releasing the lock.
+ *
+ * @param acquireTime - timestamp just after acquiring the lock.
+ * @param releaseTime - timestamp just before releasing the lock.
+ */
+ private void check(long acquireTime, long releaseTime) {
+ if (!logger.isWarnEnabled()) {
+ return;
+ }
+
+ final long lockHeldTime = releaseTime - acquireTime;
+ if (lockWarningThreshold - lockHeldTime < 0) {
+ long now;
+ long localLastLogTs;
+ do {
+ now = clock.monotonicNow();
+ localLastLogTs = lastLogTimestamp.get();
+ long deltaSinceLastLog = now - localLastLogTs;
+ // check should print log or not
+ if (deltaSinceLastLog - minLoggingGap < 0) {
+ warningsSuppressed.incrementAndGet();
+ return;
+ }
+ } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+ long suppressed = warningsSuppressed.getAndSet(0);
+ logWarning(lockHeldTime, suppressed);
+ }
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ffd2f8ab34..e5da0e5b1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -40,6 +40,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -60,6 +61,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -278,7 +280,13 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
- this.datasetLock = new AutoCloseableLock();
+ this.datasetLock = new AutoCloseableLock(
+ new InstrumentedLock(getClass().getName(), LOG,
+ conf.getTimeDuration(
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
+ 300));
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3f78233e62..3a5de3e46c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4273,4 +4273,12 @@
a plan.
+
+
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+ @Rule public TestName name = new TestName();
+
+ /**
+ * Test exclusive access of the lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testMultipleThread() throws Exception {
+ String testname = name.getMethodName();
+ InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+ lock.lock();
+ try {
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test the correctness with try-with-resource syntax.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testTryWithResourceSyntax() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicReference