HDDS-1986. Fix listkeys API. (#1588)
This commit is contained in:
parent
31e0122f4d
commit
9c72bf4621
@ -23,6 +23,9 @@
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
@ -653,7 +656,12 @@ public List<OmBucketInfo> listBuckets(final String volumeName,
|
||||
@Override
|
||||
public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
|
||||
String startKey, String keyPrefix, int maxKeys) throws IOException {
|
||||
|
||||
List<OmKeyInfo> result = new ArrayList<>();
|
||||
if (maxKeys <= 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
if (Strings.isNullOrEmpty(volumeName)) {
|
||||
throw new OMException("Volume name is required.",
|
||||
ResultCodes.VOLUME_NOT_FOUND);
|
||||
@ -688,19 +696,56 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
|
||||
seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX);
|
||||
}
|
||||
int currentCount = 0;
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
|
||||
getKeyTable()
|
||||
.iterator()) {
|
||||
KeyValue<String, OmKeyInfo> kv = keyIter.seek(seekKey);
|
||||
while (currentCount < maxKeys && keyIter.hasNext()) {
|
||||
kv = keyIter.next();
|
||||
// Skip the Start key if needed.
|
||||
if (kv != null && skipStartKey && kv.getKey().equals(seekKey)) {
|
||||
continue;
|
||||
|
||||
|
||||
TreeMap<String, OmKeyInfo> cacheKeyMap = new TreeMap<>();
|
||||
Set<String> deletedKeySet = new TreeSet<>();
|
||||
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator =
|
||||
keyTable.cacheIterator();
|
||||
|
||||
//TODO: We can avoid this iteration if table cache has stored entries in
|
||||
// treemap. Currently HashMap is used in Cache. HashMap get operation is an
|
||||
// constant time operation, where as for treeMap get is log(n).
|
||||
// So if we move to treemap, the get operation will be affected. As get
|
||||
// is frequent operation on table. So, for now in list we iterate cache map
|
||||
// and construct treeMap which match with keyPrefix and are greater than or
|
||||
// equal to startKey. Later we can revisit this, if list operation
|
||||
// is becoming slow.
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry< CacheKey<String>, CacheValue<OmKeyInfo>> entry =
|
||||
iterator.next();
|
||||
|
||||
String key = entry.getKey().getCacheKey();
|
||||
OmKeyInfo omKeyInfo = entry.getValue().getCacheValue();
|
||||
// Making sure that entry in cache is not for delete key request.
|
||||
|
||||
if (omKeyInfo != null) {
|
||||
if (key.startsWith(seekPrefix) && key.compareTo(seekKey) >= 0) {
|
||||
cacheKeyMap.put(key, omKeyInfo);
|
||||
}
|
||||
} else {
|
||||
deletedKeySet.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Get maxKeys from DB if it has.
|
||||
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
|
||||
keyIter = getKeyTable().iterator()) {
|
||||
KeyValue< String, OmKeyInfo > kv;
|
||||
keyIter.seek(seekKey);
|
||||
// we need to iterate maxKeys + 1 here because if skipStartKey is true,
|
||||
// we should skip that entry and return the result.
|
||||
while (currentCount < maxKeys + 1 && keyIter.hasNext()) {
|
||||
kv = keyIter.next();
|
||||
if (kv != null && kv.getKey().startsWith(seekPrefix)) {
|
||||
result.add(kv.getValue());
|
||||
|
||||
// Entry should not be marked for delete, consider only those
|
||||
// entries.
|
||||
if(!deletedKeySet.contains(kv.getKey())) {
|
||||
cacheKeyMap.put(kv.getKey(), kv.getValue());
|
||||
currentCount++;
|
||||
}
|
||||
} else {
|
||||
// The SeekPrefix does not match any more, we can break out of the
|
||||
// loop.
|
||||
@ -708,6 +753,28 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Finally DB entries and cache entries are merged, then return the count
|
||||
// of maxKeys from the sorted map.
|
||||
currentCount = 0;
|
||||
|
||||
for (Map.Entry<String, OmKeyInfo> cacheKey : cacheKeyMap.entrySet()) {
|
||||
if (cacheKey.getKey().equals(seekKey) && skipStartKey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.add(cacheKey.getValue());
|
||||
currentCount++;
|
||||
|
||||
if (currentCount == maxKeys) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear map and set.
|
||||
cacheKeyMap.clear();
|
||||
deletedKeySet.clear();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -19,9 +19,11 @@
|
||||
import com.google.common.base.Optional;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
|
||||
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -188,4 +190,228 @@ private void addBucketsToCache(String volumeName, String bucketName) {
|
||||
new CacheValue<>(Optional.of(omBucketInfo), 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListKeys() throws Exception {
|
||||
|
||||
String volumeNameA = "volumeA";
|
||||
String volumeNameB = "volumeB";
|
||||
String ozoneBucket = "ozoneBucket";
|
||||
String hadoopBucket = "hadoopBucket";
|
||||
|
||||
|
||||
// Create volumes and buckets.
|
||||
TestOMRequestUtils.addVolumeToDB(volumeNameA, omMetadataManager);
|
||||
TestOMRequestUtils.addVolumeToDB(volumeNameB, omMetadataManager);
|
||||
addBucketsToCache(volumeNameA, ozoneBucket);
|
||||
addBucketsToCache(volumeNameB, hadoopBucket);
|
||||
|
||||
|
||||
String prefixKeyA = "key-a";
|
||||
String prefixKeyB = "key-b";
|
||||
TreeSet<String> keysASet = new TreeSet<>();
|
||||
TreeSet<String> keysBSet = new TreeSet<>();
|
||||
for (int i=1; i<= 100; i++) {
|
||||
if (i % 2 == 0) {
|
||||
keysASet.add(
|
||||
prefixKeyA + i);
|
||||
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i);
|
||||
} else {
|
||||
keysBSet.add(
|
||||
prefixKeyB + i);
|
||||
addKeysToOM(volumeNameA, hadoopBucket, prefixKeyB + i, i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TreeSet<String> keysAVolumeBSet = new TreeSet<>();
|
||||
TreeSet<String> keysBVolumeBSet = new TreeSet<>();
|
||||
for (int i=1; i<= 100; i++) {
|
||||
if (i % 2 == 0) {
|
||||
keysAVolumeBSet.add(
|
||||
prefixKeyA + i);
|
||||
addKeysToOM(volumeNameB, ozoneBucket, prefixKeyA + i, i);
|
||||
} else {
|
||||
keysBVolumeBSet.add(
|
||||
prefixKeyB + i);
|
||||
addKeysToOM(volumeNameB, hadoopBucket, prefixKeyB + i, i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// List all keys which have prefix "key-a"
|
||||
List<OmKeyInfo> omKeyInfoList =
|
||||
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
|
||||
null, prefixKeyA, 100);
|
||||
|
||||
Assert.assertEquals(omKeyInfoList.size(), 50);
|
||||
|
||||
for (OmKeyInfo omKeyInfo : omKeyInfoList) {
|
||||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
|
||||
prefixKeyA));
|
||||
}
|
||||
|
||||
|
||||
String startKey = prefixKeyA + 10;
|
||||
omKeyInfoList =
|
||||
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
|
||||
startKey, prefixKeyA, 100);
|
||||
|
||||
Assert.assertEquals(keysASet.tailSet(
|
||||
startKey).size() - 1, omKeyInfoList.size());
|
||||
|
||||
startKey = prefixKeyA + 38;
|
||||
omKeyInfoList =
|
||||
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
|
||||
startKey, prefixKeyA, 100);
|
||||
|
||||
Assert.assertEquals(keysASet.tailSet(
|
||||
startKey).size() - 1, omKeyInfoList.size());
|
||||
|
||||
for (OmKeyInfo omKeyInfo : omKeyInfoList) {
|
||||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
|
||||
prefixKeyA));
|
||||
Assert.assertFalse(omKeyInfo.getBucketName().equals(
|
||||
prefixKeyA + 38));
|
||||
}
|
||||
|
||||
|
||||
|
||||
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket,
|
||||
null, prefixKeyB, 100);
|
||||
|
||||
Assert.assertEquals(omKeyInfoList.size(), 50);
|
||||
|
||||
for (OmKeyInfo omKeyInfo : omKeyInfoList) {
|
||||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
|
||||
prefixKeyB));
|
||||
}
|
||||
|
||||
// Try to get keys by count 10, like that get all keys in the
|
||||
// volumeB/ozoneBucket with "key-a".
|
||||
startKey = null;
|
||||
TreeSet<String> expectedKeys = new TreeSet<>();
|
||||
for (int i=0; i<5; i++) {
|
||||
|
||||
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket,
|
||||
startKey, prefixKeyB, 10);
|
||||
|
||||
Assert.assertEquals(10, omKeyInfoList.size());
|
||||
|
||||
for (OmKeyInfo omKeyInfo : omKeyInfoList) {
|
||||
expectedKeys.add(omKeyInfo.getKeyName());
|
||||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
|
||||
prefixKeyB));
|
||||
startKey = omKeyInfo.getKeyName();
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(expectedKeys, keysBVolumeBSet);
|
||||
|
||||
|
||||
// As now we have iterated all 50 buckets, calling next time should
|
||||
// return empty list.
|
||||
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket,
|
||||
startKey, prefixKeyB, 10);
|
||||
|
||||
Assert.assertEquals(omKeyInfoList.size(), 0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListKeysWithFewDeleteEntriesInCache() throws Exception {
|
||||
String volumeNameA = "volumeA";
|
||||
String ozoneBucket = "ozoneBucket";
|
||||
|
||||
// Create volumes and bucket.
|
||||
TestOMRequestUtils.addVolumeToDB(volumeNameA, omMetadataManager);
|
||||
|
||||
addBucketsToCache(volumeNameA, ozoneBucket);
|
||||
|
||||
String prefixKeyA = "key-a";
|
||||
TreeSet<String> keysASet = new TreeSet<>();
|
||||
TreeSet<String> deleteKeySet = new TreeSet<>();
|
||||
|
||||
|
||||
for (int i=1; i<= 100; i++) {
|
||||
if (i % 2 == 0) {
|
||||
keysASet.add(
|
||||
prefixKeyA + i);
|
||||
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i);
|
||||
} else {
|
||||
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i);
|
||||
String key = omMetadataManager.getOzoneKey(volumeNameA,
|
||||
ozoneBucket, prefixKeyA + i);
|
||||
// Mark as deleted in cache.
|
||||
omMetadataManager.getKeyTable().addCacheEntry(
|
||||
new CacheKey<>(key),
|
||||
new CacheValue<>(Optional.absent(), 100L));
|
||||
deleteKeySet.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Now list keys which match with prefixKeyA.
|
||||
List<OmKeyInfo> omKeyInfoList =
|
||||
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
|
||||
null, prefixKeyA, 100);
|
||||
|
||||
// As in total 100, 50 are marked for delete. It should list only 50 keys.
|
||||
Assert.assertEquals(50, omKeyInfoList.size());
|
||||
|
||||
TreeSet<String> expectedKeys = new TreeSet<>();
|
||||
|
||||
for (OmKeyInfo omKeyInfo : omKeyInfoList) {
|
||||
expectedKeys.add(omKeyInfo.getKeyName());
|
||||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(prefixKeyA));
|
||||
}
|
||||
|
||||
Assert.assertEquals(expectedKeys, keysASet);
|
||||
|
||||
|
||||
// Now get key count by 10.
|
||||
String startKey = null;
|
||||
expectedKeys = new TreeSet<>();
|
||||
for (int i=0; i<5; i++) {
|
||||
|
||||
omKeyInfoList = omMetadataManager.listKeys(volumeNameA, ozoneBucket,
|
||||
startKey, prefixKeyA, 10);
|
||||
|
||||
System.out.println(i);
|
||||
Assert.assertEquals(10, omKeyInfoList.size());
|
||||
|
||||
for (OmKeyInfo omKeyInfo : omKeyInfoList) {
|
||||
expectedKeys.add(omKeyInfo.getKeyName());
|
||||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
|
||||
prefixKeyA));
|
||||
startKey = omKeyInfo.getKeyName();
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(keysASet, expectedKeys);
|
||||
|
||||
|
||||
// As now we have iterated all 50 buckets, calling next time should
|
||||
// return empty list.
|
||||
omKeyInfoList = omMetadataManager.listKeys(volumeNameA, ozoneBucket,
|
||||
startKey, prefixKeyA, 10);
|
||||
|
||||
Assert.assertEquals(omKeyInfoList.size(), 0);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
private void addKeysToOM(String volumeName, String bucketName,
|
||||
String keyName, int i) throws Exception {
|
||||
|
||||
if (i%2== 0) {
|
||||
TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName,
|
||||
1000L, HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.ONE, omMetadataManager);
|
||||
} else {
|
||||
TestOMRequestUtils.addKeyToTableCache(volumeName, bucketName, keyName,
|
||||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE,
|
||||
omMetadataManager);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -120,7 +120,52 @@ public static void addKeyToTable(boolean openKeyTable, String volumeName,
|
||||
OMMetadataManager omMetadataManager) throws Exception {
|
||||
|
||||
|
||||
OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
|
||||
OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName,
|
||||
replicationType, replicationFactor);
|
||||
|
||||
if (openKeyTable) {
|
||||
omMetadataManager.getOpenKeyTable().put(
|
||||
omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
|
||||
clientID), omKeyInfo);
|
||||
} else {
|
||||
omMetadataManager.getKeyTable().put(omMetadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName), omKeyInfo);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key entry to key table cache.
|
||||
* @param volumeName
|
||||
* @param bucketName
|
||||
* @param keyName
|
||||
* @param replicationType
|
||||
* @param replicationFactor
|
||||
* @param omMetadataManager
|
||||
*/
|
||||
@SuppressWarnings("parameterNumber")
|
||||
public static void addKeyToTableCache(String volumeName,
|
||||
String bucketName,
|
||||
String keyName,
|
||||
HddsProtos.ReplicationType replicationType,
|
||||
HddsProtos.ReplicationFactor replicationFactor,
|
||||
OMMetadataManager omMetadataManager) {
|
||||
|
||||
|
||||
OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName,
|
||||
replicationType, replicationFactor);
|
||||
|
||||
omMetadataManager.getKeyTable().addCacheEntry(
|
||||
new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, bucketName,
|
||||
keyName)), new CacheValue<>(Optional.of(omKeyInfo),
|
||||
1L));
|
||||
|
||||
}
|
||||
|
||||
private OmKeyInfo createKeyInfo(String volumeName, String bucketName,
|
||||
String keyName, HddsProtos.ReplicationType replicationType,
|
||||
HddsProtos.ReplicationFactor replicationFactor) {
|
||||
return new OmKeyInfo.Builder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
@ -130,18 +175,9 @@ public static void addKeyToTable(boolean openKeyTable, String volumeName,
|
||||
.setModificationTime(Time.now())
|
||||
.setDataSize(1000L)
|
||||
.setReplicationType(replicationType)
|
||||
.setReplicationFactor(replicationFactor);
|
||||
|
||||
if (openKeyTable) {
|
||||
omMetadataManager.getOpenKeyTable().put(
|
||||
omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
|
||||
clientID), builder.build());
|
||||
} else {
|
||||
omMetadataManager.getKeyTable().put(omMetadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName), builder.build());
|
||||
.setReplicationFactor(replicationFactor).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Create OmKeyInfo.
|
||||
|
Loading…
Reference in New Issue
Block a user