diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java index 0abcf989d1..7e7ececb32 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java @@ -20,6 +20,9 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; +import java.util.Iterator; +import java.util.function.Consumer; + /** * A low memory footprint {@link GSet} implementation, * which uses an array for storing the elements @@ -86,17 +89,36 @@ public LightWeightResizableGSet(int initCapacity) { } @Override - public E put(final E element) { + public synchronized E put(final E element) { E existing = super.put(element); expandIfNecessary(); return existing; } + @Override + public synchronized E get(K key) { + return super.get(key); + } + + @Override + public synchronized E remove(K key) { + return super.remove(key); + } + + @Override + public synchronized int size() { + return super.size(); + } + + public synchronized void getIterator(Consumer> consumer) { + consumer.accept(super.values().iterator()); + } + /** * Resize the internal table to given capacity. */ @SuppressWarnings("unchecked") - protected void resize(int cap) { + protected synchronized void resize(int cap) { int newCapacity = actualArrayLength(cap); if (newCapacity == this.capacity) { return; @@ -121,7 +143,7 @@ protected void resize(int cap) { /** * Checks if we need to expand, and expands if necessary. */ - protected void expandIfNecessary() { + protected synchronized void expandIfNecessary() { if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) { resize(capacity * 2); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 14f9cd7730..b14e92d42d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1658,6 +1658,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { DFS_NAMESERVICES_RESOLVER_IMPL = "dfs.datanode.nameservices.resolver.impl"; + public static final String + DFS_DATANODE_LOCKMANAGER_TRACE = + "dfs.datanode.lockmanager.trace"; + + public static final boolean + DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/AutoCloseDataSetLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/AutoCloseDataSetLock.java new file mode 100644 index 0000000000..bf6edda7ab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/AutoCloseDataSetLock.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.util.StringUtils; + +import java.util.concurrent.locks.Lock; + +import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG; + +/** + * Extending AutoCloseableLock such that the users can + * use a try-with-resource syntax. + */ +public class AutoCloseDataSetLock extends AutoCloseableLock { + private Lock lock; + private AutoCloseDataSetLock parentLock; + private DataNodeLockManager dataNodeLockManager; + + public AutoCloseDataSetLock(Lock lock) { + this.lock = lock; + } + + @Override + public void close() { + if (lock != null) { + lock.unlock(); + if (dataNodeLockManager != null) { + dataNodeLockManager.hook(); + } + } else { + LOG.error("Try to unlock null lock" + + StringUtils.getStackTrace(Thread.currentThread())); + } + if (parentLock != null) { + parentLock.close(); + } + } + + /** + * Actually acquire the lock. + */ + public void lock() { + if (lock != null) { + lock.lock(); + return; + } + LOG.error("Try to lock null lock" + + StringUtils.getStackTrace(Thread.currentThread())); + } + + public void setParentLock(AutoCloseDataSetLock parent) { + if (parentLock == null) { + this.parentLock = parent; + } + } + + public void setDataNodeLockManager(DataNodeLockManager + dataNodeLockManager) { + this.dataNodeLockManager = dataNodeLockManager; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java new file mode 100644 index 0000000000..e7a3b38357 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +/** + * Use for manage a set of lock for datanode. + */ +public interface DataNodeLockManager { + + /** + * Acquire block pool level first if you want to Acquire volume lock. + * Or only acquire block pool level lock. + */ + enum LockLevel { + BLOCK_POOl, + VOLUME + } + + /** + * Acquire readLock and then lock. + */ + T readLock(LockLevel level, String... resources); + + /** + * Acquire writeLock and then lock. + */ + T writeLock(LockLevel level, String... resources); + + /** + * Add a lock to LockManager. + */ + void addLock(LockLevel level, String... resources); + + /** + * Remove a lock from LockManager. + */ + void removeLock(LockLevel level, String... resources); + + /** + * LockManager may need to back hook. + */ + void hook(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/NoLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/NoLockManager.java new file mode 100644 index 0000000000..848495cc4e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/NoLockManager.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import java.util.concurrent.locks.Lock; + +/** + * Some ut or temp replicaMap not need to lock with DataSetLockManager. + */ +public class NoLockManager implements DataNodeLockManager { + private final NoDataSetLock lock = new NoDataSetLock(null); + + private static final class NoDataSetLock extends AutoCloseDataSetLock { + + private NoDataSetLock(Lock lock) { + super(lock); + } + + @Override + public void lock() { + } + + @Override + public void close() { + } + } + + public NoLockManager() { + } + + @Override + public AutoCloseDataSetLock readLock(LockLevel level, String... resources) { + return lock; + } + + @Override + public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) { + return lock; + } + + @Override + public void addLock(LockLevel level, String... resources) { + } + + @Override + public void removeLock(LockLevel level, String... resources) { + } + + @Override + public void hook() { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java new file mode 100644 index 0000000000..1d59f87ab2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock; +import org.apache.hadoop.hdfs.server.common.DataNodeLockManager; + +import java.util.HashMap; +import java.util.Stack; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Class for maintain a set of lock for fsDataSetImpl. + */ +public class DataSetLockManager implements DataNodeLockManager { + public static final Log LOG = LogFactory.getLog(DataSetLockManager.class); + private final HashMap threadCountMap = new HashMap<>(); + private final LockMap lockMap = new LockMap(); + private boolean isFair = true; + private final boolean openLockTrace; + private Exception lastException; + + /** + * Class for maintain lockMap and is thread safe. + */ + private class LockMap { + private final HashMap readlockMap = new HashMap<>(); + private final HashMap writeLockMap = new HashMap<>(); + + public synchronized void addLock(String name, ReentrantReadWriteLock lock) { + AutoCloseDataSetLock readLock = new AutoCloseDataSetLock(lock.readLock()); + AutoCloseDataSetLock writeLock = new AutoCloseDataSetLock(lock.writeLock()); + if (openLockTrace) { + readLock.setDataNodeLockManager(DataSetLockManager.this); + writeLock.setDataNodeLockManager(DataSetLockManager.this); + } + readlockMap.putIfAbsent(name, readLock); + writeLockMap.putIfAbsent(name, writeLock); + } + + public synchronized void removeLock(String name) { + if (!readlockMap.containsKey(name) || !writeLockMap.containsKey(name)) { + LOG.error("The lock " + name + " is not in LockMap"); + } + readlockMap.remove(name); + writeLockMap.remove(name); + } + + public synchronized AutoCloseDataSetLock getReadLock(String name) { + return readlockMap.get(name); + } + + public synchronized AutoCloseDataSetLock getWriteLock(String name) { + return writeLockMap.get(name); + } + } + + /** + * Generate lock order string concatenates with lock name. + * @param level which level lock want to acquire. + * @param resources lock name by lock order. + * @return lock order string concatenates with lock name. + */ + private String generateLockName(LockLevel level, String... resources) { + if (resources.length == 1 && level == LockLevel.BLOCK_POOl) { + if (resources[0] == null) { + throw new IllegalArgumentException("acquire a null block pool lock"); + } + return resources[0]; + } else if (resources.length == 2 && level == LockLevel.VOLUME) { + if (resources[0] == null || resources[1] == null) { + throw new IllegalArgumentException("acquire a null bp lock : " + + resources[0] + "volume lock :" + resources[1]); + } + return resources[0] + resources[1]; + } else { + throw new IllegalArgumentException("lock level do not match resource"); + } + } + + /** + * Class for record thread acquire lock stack trace and count. + */ + private static class TrackLog { + private final Stack logStack = new Stack<>(); + private int lockCount = 0; + private final String threadName; + + TrackLog(String threadName) { + this.threadName = threadName; + incrLockCount(); + } + + public void incrLockCount() { + logStack.push(new Exception("lock stack trace")); + lockCount += 1; + } + + public void decrLockCount() { + logStack.pop(); + lockCount -= 1; + } + + public void showLockMessage() { + LOG.error("hold lock thread name is:" + threadName + + " hold count is:" + lockCount); + while (!logStack.isEmpty()) { + Exception e = logStack.pop(); + LOG.error("lock stack ", e); + } + } + + public boolean shouldClear() { + return lockCount == 1; + } + } + + public DataSetLockManager(Configuration conf) { + this.isFair = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY, + DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT); + this.openLockTrace = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE, + DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT); + } + + public DataSetLockManager() { + this.openLockTrace = true; + } + + @Override + public AutoCloseDataSetLock readLock(LockLevel level, String... resources) { + if (level == LockLevel.BLOCK_POOl) { + return getReadLock(level, resources[0]); + } else { + AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); + AutoCloseDataSetLock volLock = getReadLock(level, resources); + volLock.setParentLock(bpLock); + if (openLockTrace) { + LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " + + resources[0]); + } + return volLock; + } + } + + @Override + public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) { + if (level == LockLevel.BLOCK_POOl) { + return getWriteLock(level, resources[0]); + } else { + AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); + AutoCloseDataSetLock volLock = getWriteLock(level, resources); + volLock.setParentLock(bpLock); + if (openLockTrace) { + LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " + + resources[0]); + } + return volLock; + } + } + + /** + * Return a not null ReadLock. + */ + private AutoCloseDataSetLock getReadLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + AutoCloseDataSetLock lock = lockMap.getReadLock(lockName); + if (lock == null) { + LOG.warn("Ignore this error during dn restart: Not existing readLock " + + lockName); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + lock = lockMap.getReadLock(lockName); + } + lock.lock(); + if (openLockTrace) { + putThreadName(getThreadName()); + } + return lock; + } + + /** + * Return a not null WriteLock. + */ + private AutoCloseDataSetLock getWriteLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + AutoCloseDataSetLock lock = lockMap.getWriteLock(lockName); + if (lock == null) { + LOG.warn("Ignore this error during dn restart: Not existing writeLock" + + lockName); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + lock = lockMap.getWriteLock(lockName); + } + lock.lock(); + if (openLockTrace) { + putThreadName(getThreadName()); + } + return lock; + } + + @Override + public void addLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + if (level == LockLevel.BLOCK_POOl) { + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + } else { + lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair)); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + } + } + + @Override + public void removeLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + try (AutoCloseDataSetLock lock = writeLock(level, resources)) { + lock.lock(); + lockMap.removeLock(lockName); + } + } + + @Override + public void hook() { + if (openLockTrace) { + removeThreadName(getThreadName()); + } + } + + /** + * Add thread name when lock a lock. + */ + private synchronized void putThreadName(String thread) { + if (threadCountMap.containsKey(thread)) { + TrackLog trackLog = threadCountMap.get(thread); + trackLog.incrLockCount(); + } + threadCountMap.putIfAbsent(thread, new TrackLog(thread)); + } + + public void lockLeakCheck() { + if (!openLockTrace) { + LOG.warn("not open lock leak check func"); + return; + } + if (threadCountMap.isEmpty()) { + LOG.warn("all lock has release"); + return; + } + setLastException(new Exception("lock Leak")); + threadCountMap.forEach((name, trackLog) -> trackLog.showLockMessage()); + } + + /** + * Remove thread name when unlock a lock. + */ + private synchronized void removeThreadName(String thread) { + if (threadCountMap.containsKey(thread)) { + TrackLog trackLog = threadCountMap.get(thread); + if (trackLog.shouldClear()) { + threadCountMap.remove(thread); + return; + } + trackLog.decrLockCount(); + } + } + + private void setLastException(Exception e) { + this.lastException = e; + } + + public Exception getLastException() { + return lastException; + } + + private String getThreadName() { + return Thread.currentThread().getName() + Thread.currentThread().getId(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 38f4ac7cb4..584ae24ab1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6356,4 +6356,14 @@ times, we should mark it as a badnode. + + + dfs.datanode.lockmanager.trace + false + + If this is true, after shut down datanode lock Manager will print all leak + thread that not release by lock Manager. Only used for test or trace dead lock + problem. In produce default set false, because it's have little performance loss. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java new file mode 100644 index 0000000000..b514accdf1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock; +import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestDataSetLockManager { + private DataSetLockManager manager; + + @Before + public void init() { + manager = new DataSetLockManager(); + } + + @Test(timeout = 5000) + public void testBaseFunc() { + manager.addLock(LockLevel.BLOCK_POOl, "BPtest"); + manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + + AutoCloseDataSetLock lock = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest"); + AutoCloseDataSetLock lock1 = manager.readLock(LockLevel.BLOCK_POOl, "BPtest"); + lock1.close(); + lock.close(); + + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + + AutoCloseDataSetLock lock2 = manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + AutoCloseDataSetLock lock3 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + lock3.close(); + lock2.close(); + + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + + AutoCloseDataSetLock lock4 = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest"); + AutoCloseDataSetLock lock5 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + lock5.close(); + lock4.close(); + + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + + manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + manager.lockLeakCheck(); + + Exception lastException = manager.getLastException(); + assertEquals(lastException.getMessage(), "lock Leak"); + } + + @Test(timeout = 5000) + public void testAcquireWriteLockError() throws InterruptedException { + Thread t = new Thread(() -> { + manager.readLock(LockLevel.BLOCK_POOl, "test"); + manager.writeLock(LockLevel.BLOCK_POOl, "test"); + }); + t.start(); + Thread.sleep(1000); + manager.lockLeakCheck(); + Exception lastException = manager.getLastException(); + assertEquals(lastException.getMessage(), "lock Leak"); + } + + @Test(timeout = 5000) + public void testLockLeakCheck() { + manager.writeLock(LockLevel.BLOCK_POOl, "test"); + manager.lockLeakCheck(); + Exception lastException = manager.getLastException(); + assertEquals(lastException.getMessage(), "lock Leak"); + } +}