HDDS-428. OzoneManager lock optimization.

Contributed by Nanda Kumar.
This commit is contained in:
Anu Engineer 2018-09-12 10:38:36 -07:00
parent 47b72c87eb
commit 6e2129cf4e
16 changed files with 832 additions and 125 deletions

View File

@ -100,6 +100,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>

View File

@ -81,4 +81,8 @@ private HddsConfigKeys() {
"hdds.scm.chillmode.threshold.pct";
public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
public static final String HDDS_LOCK_MAX_CONCURRENCY =
"hdds.lock.max.concurrency";
public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100;
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Lock implementation which also maintains counter.
*/
public final class ActiveLock {
private Lock lock;
private AtomicInteger count;
/**
* Use ActiveLock#newInstance to create instance.
*/
private ActiveLock() {
this.lock = new ReentrantLock();
this.count = new AtomicInteger(0);
}
/**
* Creates a new instance of ActiveLock.
*
* @return new ActiveLock
*/
public static ActiveLock newInstance() {
return new ActiveLock();
}
/**
* Acquires the lock.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*/
public void lock() {
lock.lock();
}
/**
* Releases the lock.
*/
public void unlock() {
lock.unlock();
}
/**
* Increment the active count of the lock.
*/
void incrementActiveCount() {
count.incrementAndGet();
}
/**
* Decrement the active count of the lock.
*/
void decrementActiveCount() {
count.decrementAndGet();
}
/**
* Returns the active count on the lock.
*
* @return Number of active leases on the lock.
*/
int getActiveLockCount() {
return count.get();
}
/**
* Resets the active count on the lock.
*/
void resetCounter() {
count.set(0);
}
@Override
public String toString() {
return lock.toString();
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lock;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Manages the locks on a given resource. A new lock is created for each
* and every unique resource. Uniqueness of resource depends on the
* {@code equals} implementation of it.
*/
public class LockManager<T> {
private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>();
private final GenericObjectPool<ActiveLock> lockPool =
new GenericObjectPool<>(new PooledLockFactory());
/**
* Creates new LockManager instance.
*
* @param conf Configuration object
*/
public LockManager(Configuration conf) {
int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
lockPool.setMaxTotal(maxPoolSize);
}
/**
* Acquires the lock on given resource.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*/
public void lock(T resource) {
activeLocks.compute(resource, (k, v) -> {
ActiveLock lock;
try {
if (v == null) {
lock = lockPool.borrowObject();
} else {
lock = v;
}
lock.incrementActiveCount();
} catch (Exception ex) {
LOG.error("Unable to obtain lock.", ex);
throw new RuntimeException(ex);
}
return lock;
}).lock();
}
/**
* Releases the lock on given resource.
*/
public void unlock(T resource) {
ActiveLock lock = activeLocks.get(resource);
if (lock == null) {
// Someone is releasing a lock which was never acquired. Log and return.
LOG.warn("Trying to release the lock on {}, which was never acquired.",
resource);
return;
}
lock.unlock();
activeLocks.computeIfPresent(resource, (k, v) -> {
v.decrementActiveCount();
if (v.getActiveLockCount() != 0) {
return v;
}
lockPool.returnObject(v);
return null;
});
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lock;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
/**
* Pool factory to create {@code ActiveLock} instances.
*/
public class PooledLockFactory extends BasePooledObjectFactory<ActiveLock> {
@Override
public ActiveLock create() throws Exception {
return ActiveLock.newInstance();
}
@Override
public PooledObject<ActiveLock> wrap(ActiveLock activeLock) {
return new DefaultPooledObject<>(activeLock);
}
@Override
public void activateObject(PooledObject<ActiveLock> pooledObject) {
pooledObject.getObject().resetCounter();
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lock;
/*
This package contains the lock related classes.
*/

View File

@ -1168,4 +1168,15 @@
(in compressed format), but doesn't require fast io access such as SSD.
</description>
</property>
<property>
<name>hdds.lock.max.concurrency</name>
<value>100</value>
<tag>HDDS</tag>
<description>Locks in HDDS/Ozone uses object pool to maintain active locks
in the system, this property defines the max limit for the locks that
will be maintained in the pool.
</description>
</property>
</configuration>

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lock;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test-cases to test LockManager.
*/
public class TestLockManager {
@Test(timeout = 1000)
public void testWithDifferentResource() {
LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
manager.lock("/resourceOne");
// This should work, as they are different resource.
manager.lock("/resourceTwo");
manager.unlock("/resourceOne");
manager.unlock("/resourceTwo");
Assert.assertTrue(true);
}
@Test
public void testWithSameResource() throws Exception {
LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
manager.lock("/resourceOne");
AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
manager.lock("/resourceOne");
gotLock.set(true);
manager.unlock("/resourceOne");
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same object, it will wait.
Assert.assertFalse(gotLock.get());
manager.unlock("/resourceOne");
// Since we have released the lock, the new thread should have the lock
// now
// Let's give some time for the new thread to run
Thread.sleep(100);
Assert.assertTrue(gotLock.get());
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lock;
/*
This package contains the lock related test classes.
*/

View File

@ -79,9 +79,10 @@ public BucketManagerImpl(OMMetadataManager metadataManager) {
@Override
public void createBucket(OmBucketInfo bucketInfo) throws IOException {
Preconditions.checkNotNull(bucketInfo);
metadataManager.writeLock().lock();
String volumeName = bucketInfo.getVolumeName();
String bucketName = bucketInfo.getBucketName();
metadataManager.getLock().acquireVolumeLock(volumeName);
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@ -118,7 +119,8 @@ public void createBucket(OmBucketInfo bucketInfo) throws IOException {
}
throw ex;
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
metadataManager.getLock().releaseVolumeLock(volumeName);
}
}
@ -133,7 +135,7 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName)
throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
metadataManager.readLock().lock();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
byte[] value = metadataManager.getBucketTable().get(bucketKey);
@ -151,7 +153,7 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName)
}
throw ex;
} finally {
metadataManager.readLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@ -164,18 +166,11 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@Override
public void setBucketProperty(OmBucketArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
//Check if volume exists
if (metadataManager.getVolumeTable()
.get(metadataManager.getVolumeKey(volumeName)) == null) {
LOG.debug("volume: {} not found ", volumeName);
throw new OMException("Volume doesn't exist",
OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
byte[] value = metadataManager.getBucketTable().get(bucketKey);
//Check if bucket exist
if (value == null) {
@ -230,7 +225,7 @@ public void setBucketProperty(OmBucketArgs args) throws IOException {
}
throw ex;
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@ -266,16 +261,8 @@ public void deleteBucket(String volumeName, String bucketName)
throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
metadataManager.writeLock().lock();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
//Check if volume exists
if (metadataManager.getVolumeTable()
.get(metadataManager.getVolumeKey(volumeName)) == null) {
LOG.debug("volume: {} not found ", volumeName);
throw new OMException("Volume doesn't exist",
OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
//Check if bucket exists
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
if (metadataManager.getBucketTable().get(bucketKey) == null) {
@ -297,7 +284,7 @@ public void deleteBucket(String volumeName, String bucketName)
}
throw ex;
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@ -309,12 +296,8 @@ public List<OmBucketInfo> listBuckets(String volumeName,
String startBucket, String bucketPrefix, int maxNumOfBuckets)
throws IOException {
Preconditions.checkNotNull(volumeName);
metadataManager.readLock().lock();
try {
return metadataManager.listBuckets(
volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
} finally {
metadataManager.readLock().unlock();
}
return metadataManager.listBuckets(
volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
}
}

View File

@ -139,44 +139,38 @@ private void validateBucket(String volumeName, String bucketName)
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
validateBucket(volumeName, bucketName);
byte[] openKey = metadataManager.getOpenKeyBytes(
volumeName, bucketName, keyName, clientID);
try {
validateBucket(volumeName, bucketName);
byte[] openKey = metadataManager.getOpenKeyBytes(
volumeName, bucketName, keyName, clientID);
byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
if (keyData == null) {
LOG.error("Allocate block for a key not in open status in meta store" +
" /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
throw new OMException("Open Key not found",
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
OmKeyInfo keyInfo =
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
keyInfo.getFactor(), omId);
OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
.setBlockID(allocatedBlock.getBlockID())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(scmBlockSize)
.setOffset(0)
.build();
// current version not committed, so new blocks coming now are added to
// the same version
keyInfo.appendNewBlocks(Collections.singletonList(info));
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey,
keyInfo.getProtobuf().toByteArray());
return info;
} finally {
metadataManager.writeLock().unlock();
byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
if (keyData == null) {
LOG.error("Allocate block for a key not in open status in meta store" +
" /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
throw new OMException("Open Key not found",
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
OmKeyInfo keyInfo =
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
keyInfo.getFactor(), omId);
OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
.setBlockID(allocatedBlock.getBlockID())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(scmBlockSize)
.setOffset(0)
.build();
// current version not committed, so new blocks coming now are added to
// the same version
keyInfo.appendNewBlocks(Collections.singletonList(info));
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey,
keyInfo.getProtobuf().toByteArray());
return info;
}
@Override
@ -186,7 +180,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
String bucketName = args.getBucketName();
validateBucket(volumeName, bucketName);
metadataManager.writeLock().lock();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
String keyName = args.getKeyName();
ReplicationFactor factor = args.getFactor();
ReplicationType type = args.getType();
@ -286,17 +280,17 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
throw new OMException(ex.getMessage(),
OMException.ResultCodes.FAILED_KEY_ALLOCATION);
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@Override
public void commitKey(OmKeyArgs args, long clientID) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
validateBucket(volumeName, bucketName);
byte[] openKey = metadataManager.getOpenKeyBytes(volumeName, bucketName,
@ -329,17 +323,17 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException {
throw new OMException(ex.getMessage(),
OMException.ResultCodes.FAILED_KEY_ALLOCATION);
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@Override
public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] keyBytes = metadataManager.getOzoneKeyBytes(
volumeName, bucketName, keyName);
@ -357,7 +351,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
throw new OMException(ex.getMessage(),
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@ -375,7 +369,7 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
ResultCodes.FAILED_INVALID_KEY_NAME);
}
metadataManager.writeLock().lock();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
// fromKeyName should exist
byte[] fromKey = metadataManager.getOzoneKeyBytes(
@ -431,17 +425,17 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
throw new OMException(ex.getMessage(),
ResultCodes.FAILED_KEY_RENAME);
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@Override
public void deleteKey(OmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] objectKey = metadataManager.getOzoneKeyBytes(
volumeName, bucketName, keyName);
@ -470,7 +464,7 @@ public void deleteKey(OmKeyArgs args) throws IOException {
throw new OMException(ex.getMessage(), ex,
ResultCodes.FAILED_KEY_DELETION);
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@ -506,12 +500,8 @@ public List<BlockGroup> getPendingDeletionKeys(final int count)
@Override
public List<BlockGroup> getExpiredOpenKeys() throws IOException {
metadataManager.readLock().lock();
try {
return metadataManager.getExpiredOpenKeys();
} finally {
metadataManager.readLock().unlock();
}
return metadataManager.getExpiredOpenKeys();
}
@Override

View File

@ -26,7 +26,6 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.Lock;
/**
* OM metadata manager interface.
@ -51,18 +50,11 @@ public interface OMMetadataManager {
DBStore getStore();
/**
* Returns the read lock used on Metadata DB.
* Returns the OzoneManagerLock used on Metadata DB.
*
* @return readLock
* @return OzoneManagerLock
*/
Lock readLock();
/**
* Returns the write lock used on Metadata DB.
*
* @return writeLock
*/
Lock writeLock();
OzoneManagerLock getLock();
/**
* Given a volume return the corresponding DB key.

View File

@ -52,9 +52,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
@ -102,9 +99,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
private final DBStore store;
// TODO: Make this lock move into Table instead of *ONE* lock for the whole
// DB.
private final ReadWriteLock lock;
private final OzoneManagerLock lock;
private final long openKeyExpireThresholdMS;
private final Table userTable;
@ -116,7 +111,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
File metaDir = getOzoneMetaDirPath(conf);
this.lock = new ReentrantReadWriteLock();
this.lock = new OzoneManagerLock(conf);
this.openKeyExpireThresholdMS = 1000 * conf.getInt(
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
@ -280,23 +275,13 @@ public byte[] getOpenKeyBytes(String volume, String bucket,
}
/**
* Returns the read lock used on Metadata DB.
* Returns the OzoneManagerLock used on Metadata DB.
*
* @return readLock
* @return OzoneManagerLock
*/
@Override
public Lock readLock() {
return lock.readLock();
}
/**
* Returns the write lock used on Metadata DB.
*
* @return writeLock
*/
@Override
public Lock writeLock() {
return lock.writeLock();
public OzoneManagerLock getLock() {
return lock;
}
/**

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.lock.LockManager;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
/**
* Provides different locks to handle concurrency in OzoneMaster.
* We also maintain lock hierarchy, based on the weight.
*
* <table>
* <tr>
* <td><b> WEIGHT </b></td> <td><b> LOCK </b></td>
* </tr>
* <tr>
* <td> 0 </td> <td> User Lock </td>
* </tr>
* <tr>
* <td> 1 </td> <td> Volume Lock </td>
* </tr>
* <tr>
* <td> 2 </td> <td> Bucket Lock </td>
* </tr>
* </table>
*
* One cannot obtain a lower weight lock while holding a lock with higher
* weight. The other way around is possible. <br>
* <br>
* <p>
* For example:
* <br>
* -> acquireVolumeLock (will work)<br>
* +-> acquireBucketLock (will work)<br>
* +--> acquireUserLock (will throw Exception)<br>
* </p>
* <br>
*
* To acquire a user lock you should not hold any Volume/Bucket lock. Similarly
* to acquire a Volume lock you should not hold any Bucket lock.
*
*/
public final class OzoneManagerLock {
private static final String VOLUME_LOCK = "volumeLock";
private static final String BUCKET_LOCK = "bucketLock";
private final LockManager<String> manager;
// To maintain locks held by current thread.
private final ThreadLocal<Map<String, AtomicInteger>> myLocks =
ThreadLocal.withInitial(() -> ImmutableMap.of(
VOLUME_LOCK, new AtomicInteger(0),
BUCKET_LOCK, new AtomicInteger(0)));
/**
* Creates new OzoneManagerLock instance.
* @param conf Configuration object
*/
public OzoneManagerLock(Configuration conf) {
manager = new LockManager<>(conf);
}
/**
* Acquires user lock on the given resource.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*
* @param user User on which the lock has to be acquired
*/
public void acquireUserLock(String user) {
// Calling thread should not hold any volume or bucket lock.
if (hasAnyVolumeLock() || hasAnyBucketLock()) {
throw new RuntimeException(
"Thread '" + Thread.currentThread().getName() +
"' cannot acquire user lock" +
" while holding volume/bucket lock(s).");
}
manager.lock(OM_USER_PREFIX + user);
}
/**
* Releases the user lock on given resource.
*/
public void releaseUserLock(String user) {
manager.unlock(OM_USER_PREFIX + user);
}
/**
* Acquires volume lock on the given resource.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*
* @param volume Volume on which the lock has to be acquired
*/
public void acquireVolumeLock(String volume) {
// Calling thread should not hold any bucket lock.
if (hasAnyBucketLock()) {
throw new RuntimeException(
"Thread '" + Thread.currentThread().getName() +
"' cannot acquire volume lock while holding bucket lock(s).");
}
manager.lock(OM_KEY_PREFIX + volume);
myLocks.get().get(VOLUME_LOCK).incrementAndGet();
}
/**
* Releases the volume lock on given resource.
*/
public void releaseVolumeLock(String volume) {
manager.unlock(OM_KEY_PREFIX + volume);
myLocks.get().get(VOLUME_LOCK).decrementAndGet();
}
/**
* Acquires bucket lock on the given resource.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*
* @param bucket Bucket on which the lock has to be acquired
*/
public void acquireBucketLock(String volume, String bucket) {
manager.lock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket);
myLocks.get().get(BUCKET_LOCK).incrementAndGet();
}
/**
* Releases the bucket lock on given resource.
*/
public void releaseBucketLock(String volume, String bucket) {
manager.unlock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket);
myLocks.get().get(BUCKET_LOCK).decrementAndGet();
}
/**
* Returns true if the current thread holds any volume lock.
* @return true if current thread holds volume lock, else false
*/
private boolean hasAnyVolumeLock() {
return myLocks.get().get(VOLUME_LOCK).get() != 0;
}
/**
* Returns true if the current thread holds any bucket lock.
* @return true if current thread holds bucket lock, else false
*/
private boolean hasAnyBucketLock() {
return myLocks.get().get(BUCKET_LOCK).get() != 0;
}
}

View File

@ -126,7 +126,8 @@ private void delVolumeFromOwnerList(String volume, String owner,
@Override
public void createVolume(OmVolumeArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
metadataManager.getLock().acquireUserLock(args.getOwnerName());
metadataManager.getLock().acquireVolumeLock(args.getVolume());
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
byte[] volumeInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@ -177,7 +178,8 @@ public void createVolume(OmVolumeArgs args) throws IOException {
throw (IOException) ex;
}
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseVolumeLock(args.getVolume());
metadataManager.getLock().releaseUserLock(args.getOwnerName());
}
}
@ -192,7 +194,8 @@ public void createVolume(OmVolumeArgs args) throws IOException {
public void setOwner(String volume, String owner) throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(owner);
metadataManager.writeLock().lock();
metadataManager.getLock().acquireUserLock(owner);
metadataManager.getLock().acquireVolumeLock(volume);
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@ -235,7 +238,8 @@ public void setOwner(String volume, String owner) throws IOException {
throw (IOException) ex;
}
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseVolumeLock(volume);
metadataManager.getLock().releaseUserLock(owner);
}
}
@ -248,7 +252,7 @@ public void setOwner(String volume, String owner) throws IOException {
*/
public void setQuota(String volume, long quota) throws IOException {
Preconditions.checkNotNull(volume);
metadataManager.writeLock().lock();
metadataManager.getLock().acquireVolumeLock(volume);
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@ -279,7 +283,7 @@ public void setQuota(String volume, long quota) throws IOException {
}
throw ex;
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseVolumeLock(volume);
}
}
@ -291,7 +295,7 @@ public void setQuota(String volume, long quota) throws IOException {
*/
public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
Preconditions.checkNotNull(volume);
metadataManager.readLock().lock();
metadataManager.getLock().acquireVolumeLock(volume);
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@ -310,7 +314,7 @@ public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
}
throw ex;
} finally {
metadataManager.readLock().unlock();
metadataManager.getLock().releaseVolumeLock(volume);
}
}
@ -323,7 +327,15 @@ public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
@Override
public void deleteVolume(String volume) throws IOException {
Preconditions.checkNotNull(volume);
metadataManager.writeLock().lock();
String owner;
metadataManager.getLock().acquireVolumeLock(volume);
try {
owner = getVolumeInfo(volume).getOwnerName();
} finally {
metadataManager.getLock().releaseVolumeLock(volume);
}
metadataManager.getLock().acquireUserLock(owner);
metadataManager.getLock().acquireVolumeLock(volume);
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
@ -359,7 +371,8 @@ public void deleteVolume(String volume) throws IOException {
throw (IOException) ex;
}
} finally {
metadataManager.writeLock().unlock();
metadataManager.getLock().releaseVolumeLock(volume);
metadataManager.getLock().releaseUserLock(owner);
}
}
@ -375,7 +388,7 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(userAcl);
metadataManager.readLock().lock();
metadataManager.getLock().acquireVolumeLock(volume);
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
@ -395,7 +408,7 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
}
throw ex;
} finally {
metadataManager.readLock().unlock();
metadataManager.getLock().releaseVolumeLock(volume);
}
}
@ -405,12 +418,12 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
@Override
public List<OmVolumeArgs> listVolumes(String userName,
String prefix, String startKey, int maxKeys) throws IOException {
metadataManager.readLock().lock();
metadataManager.getLock().acquireUserLock(userName);
try {
return metadataManager.listVolumes(
userName, prefix, startKey, maxKeys);
} finally {
metadataManager.readLock().unlock();
metadataManager.getLock().releaseUserLock(userName);
}
}
}

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Contains test-cases to verify OzoneManagerLock.
*/
public class TestOzoneManagerLock {
@Test(timeout = 1000)
public void testDifferentUserLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireUserLock("userOne");
lock.acquireUserLock("userTwo");
lock.releaseUserLock("userOne");
lock.releaseUserLock("userTwo");
Assert.assertTrue(true);
}
@Test
public void testSameUserLock() throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireUserLock("userOne");
AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
lock.acquireUserLock("userOne");
gotLock.set(true);
lock.releaseUserLock("userOne");
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same user, it will wait.
Assert.assertFalse(gotLock.get());
lock.releaseUserLock("userOne");
// Since we have released the lock, the new thread should have the lock
// now
// Let's give some time for the new thread to run
Thread.sleep(100);
Assert.assertTrue(gotLock.get());
}
@Test(timeout = 1000)
public void testDifferentVolumeLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireVolumeLock("volOne");
lock.acquireVolumeLock("volTwo");
lock.releaseVolumeLock("volOne");
lock.releaseVolumeLock("volTwo");
Assert.assertTrue(true);
}
@Test
public void testSameVolumeLock() throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireVolumeLock("volOne");
AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
lock.acquireVolumeLock("volOne");
gotLock.set(true);
lock.releaseVolumeLock("volOne");
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same user, it will wait.
Assert.assertFalse(gotLock.get());
lock.releaseVolumeLock("volOne");
// Since we have released the lock, the new thread should have the lock
// now
// Let's give some time for the new thread to run
Thread.sleep(100);
Assert.assertTrue(gotLock.get());
}
@Test(timeout = 1000)
public void testDifferentBucketLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireBucketLock("volOne", "bucketOne");
lock.acquireBucketLock("volOne", "bucketTwo");
lock.releaseBucketLock("volOne", "bucketTwo");
lock.releaseBucketLock("volOne", "bucketOne");
Assert.assertTrue(true);
}
@Test
public void testSameBucketLock() throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireBucketLock("volOne", "bucketOne");
AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
lock.acquireBucketLock("volOne", "bucketOne");
gotLock.set(true);
lock.releaseBucketLock("volOne", "bucketOne");
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same user, it will wait.
Assert.assertFalse(gotLock.get());
lock.releaseBucketLock("volOne", "bucketOne");
// Since we have released the lock, the new thread should have the lock
// now
// Let's give some time for the new thread to run
Thread.sleep(100);
Assert.assertTrue(gotLock.get());
}
@Test(timeout = 1000)
public void testVolumeLockAfterUserLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireUserLock("userOne");
lock.acquireVolumeLock("volOne");
lock.releaseVolumeLock("volOne");
lock.releaseUserLock("userOne");
Assert.assertTrue(true);
}
@Test(timeout = 1000)
public void testBucketLockAfterVolumeLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireVolumeLock("volOne");
lock.acquireBucketLock("volOne", "bucketOne");
lock.releaseBucketLock("volOne", "bucketOne");
lock.releaseVolumeLock("volOne");
Assert.assertTrue(true);
}
@Test(timeout = 1000)
public void testBucketLockAfterVolumeLockAfterUserLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireUserLock("userOne");
lock.acquireVolumeLock("volOne");
lock.acquireBucketLock("volOne", "bucketOne");
lock.releaseBucketLock("volOne", "bucketOne");
lock.releaseVolumeLock("volOne");
lock.releaseUserLock("userOne");
Assert.assertTrue(true);
}
@Test
public void testUserLockAfterVolumeLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireVolumeLock("volOne");
try {
lock.acquireUserLock("userOne");
Assert.fail();
} catch (RuntimeException ex) {
String msg =
"cannot acquire user lock while holding volume/bucket lock(s).";
Assert.assertTrue(ex.getMessage().contains(msg));
}
lock.releaseVolumeLock("volOne");
Assert.assertTrue(true);
}
@Test
public void testVolumeLockAfterBucketLock() {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
lock.acquireBucketLock("volOne", "bucketOne");
try {
lock.acquireVolumeLock("volOne");
Assert.fail();
} catch (RuntimeException ex) {
String msg =
"cannot acquire volume lock while holding bucket lock(s).";
Assert.assertTrue(ex.getMessage().contains(msg));
}
lock.releaseBucketLock("volOne", "bucketOne");
Assert.assertTrue(true);
}
}