HDDS-1624 : Refactor operations inside the bucket lock in OM key write. (#882)
This commit is contained in:
parent
580b639908
commit
1a78794227
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.utils;
|
||||
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
|
||||
/**
|
||||
* This class uses system current time milliseconds to generate unique id.
|
||||
*/
|
||||
public final class UniqueId {
|
||||
/*
|
||||
* When we represent time in milliseconds using 'long' data type,
|
||||
* the LSB bits are used. Currently we are only using 44 bits (LSB),
|
||||
* 20 bits (MSB) are not used.
|
||||
* We will exhaust this 44 bits only when we are in year 2525,
|
||||
* until then we can safely use this 20 bits (MSB) for offset to generate
|
||||
* unique id within millisecond.
|
||||
*
|
||||
* Year : Mon Dec 31 18:49:04 IST 2525
|
||||
* TimeInMillis: 17545641544247
|
||||
* Binary Representation:
|
||||
* MSB (20 bits): 0000 0000 0000 0000 0000
|
||||
* LSB (44 bits): 1111 1111 0101 0010 1001 1011 1011 0100 1010 0011 0111
|
||||
*
|
||||
* We have 20 bits to run counter, we should exclude the first bit (MSB)
|
||||
* as we don't want to deal with negative values.
|
||||
* To be on safer side we will use 'short' data type which is of length
|
||||
* 16 bits and will give us 65,536 values for offset.
|
||||
*
|
||||
*/
|
||||
|
||||
private static volatile short offset = 0;
|
||||
|
||||
/**
|
||||
* Private constructor so that no one can instantiate this class.
|
||||
*/
|
||||
private UniqueId() {}
|
||||
|
||||
/**
|
||||
* Calculate and returns next unique id based on System#currentTimeMillis.
|
||||
*
|
||||
* @return unique long value
|
||||
*/
|
||||
public static synchronized long next() {
|
||||
long utcTime = HddsUtils.getUtcTime();
|
||||
if ((utcTime & 0xFFFF000000000000L) == 0) {
|
||||
return utcTime << Short.SIZE | (offset++ & 0x0000FFFF);
|
||||
}
|
||||
throw new RuntimeException("Got invalid UTC time," +
|
||||
" cannot generate unique Id. UTC Time: " + utcTime);
|
||||
}
|
||||
}
|
@ -119,6 +119,16 @@ public boolean isEmpty() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExist(byte[] key) throws IOException {
|
||||
try {
|
||||
return db.get(handle, key) != null;
|
||||
} catch (RocksDBException e) {
|
||||
throw toIOException(
|
||||
"Error in accessing DB. ", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(byte[] key) throws IOException {
|
||||
try {
|
||||
|
@ -58,6 +58,16 @@ void putWithBatch(BatchOperation batch, KEY key, VALUE value)
|
||||
*/
|
||||
boolean isEmpty() throws IOException;
|
||||
|
||||
/**
|
||||
* Check if a given key exists in Metadata store.
|
||||
* (Optimization to save on data deserialization)
|
||||
* A lock on the key / bucket needs to be acquired before invoking this API.
|
||||
* @param key metadata key
|
||||
* @return true if the metadata store contains a key.
|
||||
* @throws IOException on Failure
|
||||
*/
|
||||
boolean isExist(KEY key) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the value mapped to the given key in byte array or returns null
|
||||
* if the key is not found.
|
||||
|
@ -79,6 +79,13 @@ public boolean isEmpty() throws IOException {
|
||||
return rawTable.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExist(KEY key) throws IOException {
|
||||
CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
|
||||
return (cacheValue != null && cacheValue.getValue() != null) ||
|
||||
rawTable.isExist(codecRegistry.asRawData(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value mapped to the given key in byte array or returns null
|
||||
* if the key is not found.
|
||||
|
@ -51,7 +51,7 @@ public class TestRDBTableStore {
|
||||
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
|
||||
"First", "Second", "Third",
|
||||
"Fourth", "Fifth",
|
||||
"Sixth");
|
||||
"Sixth", "Seventh");
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
private RDBStore rdbStore = null;
|
||||
@ -228,4 +228,23 @@ public void forEachAndIterator() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsExist() throws Exception {
|
||||
try (Table<byte[], byte[]> testTable = rdbStore.getTable("Seventh")) {
|
||||
byte[] key =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
testTable.put(key, value);
|
||||
Assert.assertTrue(testTable.isExist(key));
|
||||
|
||||
testTable.delete(key);
|
||||
Assert.assertFalse(testTable.isExist(key));
|
||||
|
||||
byte[] invalidKey =
|
||||
RandomStringUtils.random(5).getBytes(StandardCharsets.UTF_8);
|
||||
Assert.assertFalse(testTable.isExist(invalidKey));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ public class TestTypedRDBTableStore {
|
||||
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
|
||||
"First", "Second", "Third",
|
||||
"Fourth", "Fifth",
|
||||
"Sixth", "Seven");
|
||||
"Sixth", "Seven", "Eighth");
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
private RDBStore rdbStore = null;
|
||||
@ -316,4 +316,39 @@ public void testTypedTableWithCacheWithFewDeletedOperationType()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsExist() throws Exception {
|
||||
try (Table<String, String> testTable = createTypedTable(
|
||||
"Eighth")) {
|
||||
String key =
|
||||
RandomStringUtils.random(10);
|
||||
String value = RandomStringUtils.random(10);
|
||||
testTable.put(key, value);
|
||||
Assert.assertTrue(testTable.isExist(key));
|
||||
|
||||
String invalidKey = key + RandomStringUtils.random(1);
|
||||
Assert.assertFalse(testTable.isExist(invalidKey));
|
||||
|
||||
testTable.delete(key);
|
||||
Assert.assertFalse(testTable.isExist(key));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsExistCache() throws Exception {
|
||||
try (Table<String, String> testTable = createTypedTable(
|
||||
"Eighth")) {
|
||||
String key =
|
||||
RandomStringUtils.random(10);
|
||||
String value = RandomStringUtils.random(10);
|
||||
testTable.addCacheEntry(new CacheKey<>(key),
|
||||
new CacheValue<>(Optional.of(value), 1L));
|
||||
Assert.assertTrue(testTable.isExist(key));
|
||||
|
||||
testTable.addCacheEntry(new CacheKey<>(key),
|
||||
new CacheValue<>(Optional.absent(), 1L));
|
||||
Assert.assertFalse(testTable.isExist(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,6 @@
|
||||
import javax.management.ObjectName;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.client.ContainerBlockID;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
@ -46,6 +45,7 @@
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.utils.UniqueId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -354,47 +354,4 @@ public static Logger getLogger() {
|
||||
/**
|
||||
* This class uses system current time milliseconds to generate unique id.
|
||||
*/
|
||||
public static final class UniqueId {
|
||||
/*
|
||||
* When we represent time in milliseconds using 'long' data type,
|
||||
* the LSB bits are used. Currently we are only using 44 bits (LSB),
|
||||
* 20 bits (MSB) are not used.
|
||||
* We will exhaust this 44 bits only when we are in year 2525,
|
||||
* until then we can safely use this 20 bits (MSB) for offset to generate
|
||||
* unique id within millisecond.
|
||||
*
|
||||
* Year : Mon Dec 31 18:49:04 IST 2525
|
||||
* TimeInMillis: 17545641544247
|
||||
* Binary Representation:
|
||||
* MSB (20 bits): 0000 0000 0000 0000 0000
|
||||
* LSB (44 bits): 1111 1111 0101 0010 1001 1011 1011 0100 1010 0011 0111
|
||||
*
|
||||
* We have 20 bits to run counter, we should exclude the first bit (MSB)
|
||||
* as we don't want to deal with negative values.
|
||||
* To be on safer side we will use 'short' data type which is of length
|
||||
* 16 bits and will give us 65,536 values for offset.
|
||||
*
|
||||
*/
|
||||
|
||||
private static volatile short offset = 0;
|
||||
|
||||
/**
|
||||
* Private constructor so that no one can instantiate this class.
|
||||
*/
|
||||
private UniqueId() {}
|
||||
|
||||
/**
|
||||
* Calculate and returns next unique id based on System#currentTimeMillis.
|
||||
*
|
||||
* @return unique long value
|
||||
*/
|
||||
public static synchronized long next() {
|
||||
long utcTime = HddsUtils.getUtcTime();
|
||||
if ((utcTime & 0xFFFF000000000000L) == 0) {
|
||||
return utcTime << Short.SIZE | (offset++ & 0x0000FFFF);
|
||||
}
|
||||
throw new RuntimeException("Got invalid UTC time," +
|
||||
" cannot generate unique Id. UTC Time: " + utcTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -84,6 +84,7 @@
|
||||
.PartKeyInfo;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.apache.hadoop.utils.UniqueId;
|
||||
import org.apache.hadoop.utils.db.BatchOperation;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.CodecRegistry;
|
||||
@ -406,10 +407,29 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
|
||||
String keyName = args.getKeyName();
|
||||
validateBucket(volumeName, bucketName);
|
||||
|
||||
long currentTime = Time.monotonicNowNanos();
|
||||
long currentTime = UniqueId.next();
|
||||
OmKeyInfo keyInfo;
|
||||
String openKey;
|
||||
long openVersion;
|
||||
// NOTE size of a key is not a hard limit on anything, it is a value that
|
||||
// client should expect, in terms of current size of key. If client sets
|
||||
// a value, then this value is used, otherwise, we allocate a single
|
||||
// block which is the current size, if read by the client.
|
||||
final long size = args.getDataSize() >= 0 ?
|
||||
args.getDataSize() : scmBlockSize;
|
||||
final List<OmKeyLocationInfo> locations = new ArrayList<>();
|
||||
|
||||
ReplicationFactor factor = args.getFactor();
|
||||
if (factor == null) {
|
||||
factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE;
|
||||
}
|
||||
|
||||
ReplicationType type = args.getType();
|
||||
if (type == null) {
|
||||
type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
|
||||
}
|
||||
|
||||
String dbKeyName = metadataManager.getOzoneKey(
|
||||
args.getVolumeName(), args.getBucketName(), args.getKeyName());
|
||||
|
||||
FileEncryptionInfo encInfo;
|
||||
|
||||
@ -417,37 +437,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
|
||||
encInfo = getFileEncryptionInfo(bucketInfo);
|
||||
// NOTE size of a key is not a hard limit on anything, it is a value that
|
||||
// client should expect, in terms of current size of key. If client sets
|
||||
// a value, then this value is used, otherwise, we allocate a single
|
||||
// block which is the current size, if read by the client.
|
||||
long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
|
||||
List<OmKeyLocationInfo> locations = new ArrayList<>();
|
||||
if (args.getIsMultipartKey()) {
|
||||
keyInfo = prepareMultipartKeyInfo(args, size, locations, encInfo);
|
||||
//TODO args.getMetadata
|
||||
} else {
|
||||
keyInfo = prepareKeyInfo(args, size, locations, encInfo);
|
||||
}
|
||||
|
||||
openVersion = keyInfo.getLatestVersionLocations().getVersion();
|
||||
openKey = metadataManager.getOpenKey(
|
||||
volumeName, bucketName, keyName, currentTime);
|
||||
if (metadataManager.getOpenKeyTable().get(openKey) != null) {
|
||||
// This should not happen. If this condition is satisfied, it means
|
||||
// that we have generated a same openKeyId (i.e. currentTime) for two
|
||||
// different client who are trying to write the same key at the same
|
||||
// time. The chance of this happening is very, very minimal.
|
||||
|
||||
// Do we really need this check? Can we avoid this to gain some
|
||||
// minor performance improvement?
|
||||
LOG.warn("Cannot allocate key. The generated open key id is already" +
|
||||
"used for the same key which is currently being written.");
|
||||
throw new OMException("Cannot allocate key. Not able to get a valid" +
|
||||
"open key id.", ResultCodes.KEY_ALLOCATION_ERROR);
|
||||
}
|
||||
LOG.debug("Key {} allocated in volume {} bucket {}",
|
||||
keyName, volumeName, bucketName);
|
||||
keyInfo = prepareKeyInfo(args, dbKeyName, size, locations, encInfo);
|
||||
} catch (OMException e) {
|
||||
throw e;
|
||||
} catch (IOException ex) {
|
||||
@ -457,7 +447,14 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
|
||||
} finally {
|
||||
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
|
||||
}
|
||||
|
||||
if (keyInfo == null) {
|
||||
// the key does not exist, create a new object, the new blocks are the
|
||||
// version 0
|
||||
keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
|
||||
}
|
||||
openVersion = keyInfo.getLatestVersionLocations().getVersion();
|
||||
LOG.debug("Key {} allocated in volume {} bucket {}",
|
||||
keyName, volumeName, bucketName);
|
||||
allocateBlockInKey(keyInfo, args.getDataSize(), currentTime);
|
||||
return new OpenKeySession(currentTime, keyInfo, openVersion);
|
||||
}
|
||||
@ -485,33 +482,21 @@ private void allocateBlockInKey(OmKeyInfo keyInfo, long size, long sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
private OmKeyInfo prepareKeyInfo(OmKeyArgs args, long size,
|
||||
private OmKeyInfo prepareKeyInfo(
|
||||
OmKeyArgs keyArgs, String dbKeyName, long size,
|
||||
List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
|
||||
throws IOException {
|
||||
ReplicationFactor factor = args.getFactor();
|
||||
ReplicationType type = args.getType();
|
||||
OmKeyInfo keyInfo;
|
||||
// If user does not specify a replication strategy or
|
||||
// replication factor, OM will use defaults.
|
||||
if (factor == null) {
|
||||
factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE;
|
||||
}
|
||||
if (type == null) {
|
||||
type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
|
||||
}
|
||||
String objectKey = metadataManager.getOzoneKey(
|
||||
args.getVolumeName(), args.getBucketName(), args.getKeyName());
|
||||
keyInfo = metadataManager.getKeyTable().get(objectKey);
|
||||
if (keyInfo != null) {
|
||||
OmKeyInfo keyInfo = null;
|
||||
if (keyArgs.getIsMultipartKey()) {
|
||||
keyInfo = prepareMultipartKeyInfo(keyArgs, size, locations, encInfo);
|
||||
//TODO args.getMetadata
|
||||
} else if (metadataManager.getKeyTable().isExist(dbKeyName)) {
|
||||
keyInfo = metadataManager.getKeyTable().get(dbKeyName);
|
||||
// the key already exist, the new blocks will be added as new version
|
||||
// when locations.size = 0, the new version will have identical blocks
|
||||
// as its previous version
|
||||
keyInfo.addNewVersion(locations);
|
||||
keyInfo.setDataSize(size + keyInfo.getDataSize());
|
||||
} else {
|
||||
// the key does not exist, create a new object, the new blocks are the
|
||||
// version 0
|
||||
keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
|
||||
}
|
||||
return keyInfo;
|
||||
}
|
||||
@ -618,13 +603,15 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException {
|
||||
String volumeName = args.getVolumeName();
|
||||
String bucketName = args.getBucketName();
|
||||
String keyName = args.getKeyName();
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
|
||||
String objectKey = metadataManager
|
||||
.getOzoneKey(volumeName, bucketName, keyName);
|
||||
String openKey = metadataManager
|
||||
.getOpenKey(volumeName, bucketName, keyName, clientID);
|
||||
Preconditions.checkNotNull(locationInfoList);
|
||||
try {
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
validateBucket(volumeName, bucketName);
|
||||
String openKey = metadataManager.getOpenKey(volumeName, bucketName,
|
||||
keyName, clientID);
|
||||
String objectKey = metadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName);
|
||||
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
|
||||
if (keyInfo == null) {
|
||||
throw new OMException("Commit a key without corresponding entry " +
|
||||
@ -632,8 +619,6 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException {
|
||||
}
|
||||
keyInfo.setDataSize(args.getDataSize());
|
||||
keyInfo.setModificationTime(Time.now());
|
||||
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
|
||||
Preconditions.checkNotNull(locationInfoList);
|
||||
|
||||
//update the block length for each block
|
||||
keyInfo.updateLocationInfoList(locationInfoList);
|
||||
|
Loading…
Reference in New Issue
Block a user