HDDS-1984. Fix listBucket API. (#1555)
This commit is contained in:
parent
4850b3aa86
commit
957253fea6
@ -24,7 +24,7 @@
|
||||
* CacheKey for the RocksDB table.
|
||||
* @param <KEY>
|
||||
*/
|
||||
public class CacheKey<KEY> {
|
||||
public class CacheKey<KEY> implements Comparable<KEY> {
|
||||
|
||||
private final KEY key;
|
||||
|
||||
@ -53,4 +53,13 @@ public boolean equals(Object o) {
|
||||
public int hashCode() {
|
||||
return Objects.hash(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Object o) {
|
||||
if(Objects.equals(key, ((CacheKey<?>)o).key)) {
|
||||
return 0;
|
||||
} else {
|
||||
return key.toString().compareTo((((CacheKey<?>) o).key).toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -47,7 +48,7 @@
|
||||
public class TableCacheImpl<CACHEKEY extends CacheKey,
|
||||
CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> {
|
||||
|
||||
private final ConcurrentHashMap<CACHEKEY, CACHEVALUE> cache;
|
||||
private final Map<CACHEKEY, CACHEVALUE> cache;
|
||||
private final NavigableSet<EpochEntry<CACHEKEY>> epochEntries;
|
||||
private ExecutorService executorService;
|
||||
private CacheCleanupPolicy cleanupPolicy;
|
||||
@ -55,7 +56,14 @@ public class TableCacheImpl<CACHEKEY extends CacheKey,
|
||||
|
||||
|
||||
public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) {
|
||||
cache = new ConcurrentHashMap<>();
|
||||
|
||||
// As for full table cache only we need elements to be inserted in sorted
|
||||
// manner, so that list will be easy. For other we can go with Hash map.
|
||||
if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
|
||||
cache = new ConcurrentSkipListMap<>();
|
||||
} else {
|
||||
cache = new ConcurrentHashMap<>();
|
||||
}
|
||||
epochEntries = new ConcurrentSkipListSet<>();
|
||||
// Created a singleThreadExecutor, so one cleanup will be running at a
|
||||
// time.
|
||||
|
@ -619,23 +619,31 @@ public List<OmBucketInfo> listBuckets(final String volumeName,
|
||||
}
|
||||
int currentCount = 0;
|
||||
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
|
||||
bucketIter = bucketTable.iterator()) {
|
||||
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(startKey);
|
||||
while (currentCount < maxNumOfBuckets && bucketIter.hasNext()) {
|
||||
kv = bucketIter.next();
|
||||
// Skip the Start Bucket if needed.
|
||||
if (kv != null && skipStartKey &&
|
||||
kv.getKey().equals(startKey)) {
|
||||
|
||||
// For Bucket it is full cache, so we can just iterate in-memory table
|
||||
// cache.
|
||||
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> iterator =
|
||||
bucketTable.cacheIterator();
|
||||
|
||||
|
||||
while (currentCount < maxNumOfBuckets && iterator.hasNext()) {
|
||||
Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry =
|
||||
iterator.next();
|
||||
|
||||
String key = entry.getKey().getCacheKey();
|
||||
OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
|
||||
// Making sure that entry in cache is not for delete bucket request.
|
||||
|
||||
if (omBucketInfo != null) {
|
||||
if (key.equals(startKey) && skipStartKey) {
|
||||
continue;
|
||||
}
|
||||
if (kv != null && kv.getKey().startsWith(seekPrefix)) {
|
||||
result.add(kv.getValue());
|
||||
|
||||
// We should return only the keys, whose keys match with prefix and
|
||||
// the keys after the startBucket.
|
||||
if (key.startsWith(seekPrefix) && key.compareTo(startKey) > 0) {
|
||||
result.add(omBucketInfo);
|
||||
currentCount++;
|
||||
} else {
|
||||
// The SeekPrefix does not match any more, we can break out of the
|
||||
// loop.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,191 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
import com.google.common.base.Optional;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
|
||||
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
|
||||
|
||||
/**
|
||||
* Tests OzoneManager MetadataManager.
|
||||
*/
|
||||
public class TestOmMetadataManager {
|
||||
|
||||
private OMMetadataManager omMetadataManager;
|
||||
private OzoneConfiguration ozoneConfiguration;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
ozoneConfiguration = new OzoneConfiguration();
|
||||
ozoneConfiguration.set(OZONE_OM_DB_DIRS,
|
||||
folder.getRoot().getAbsolutePath());
|
||||
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
|
||||
}
|
||||
@Test
|
||||
public void testListBuckets() throws Exception {
|
||||
|
||||
String volumeName1 = "volumeA";
|
||||
String prefixBucketNameWithOzoneOwner = "ozoneBucket";
|
||||
String prefixBucketNameWithHadoopOwner = "hadoopBucket";
|
||||
|
||||
TestOMRequestUtils.addVolumeToDB(volumeName1, omMetadataManager);
|
||||
|
||||
|
||||
TreeSet<String> volumeABucketsPrefixWithOzoneOwner = new TreeSet<>();
|
||||
TreeSet<String> volumeABucketsPrefixWithHadoopOwner = new TreeSet<>();
|
||||
for (int i=1; i<= 100; i++) {
|
||||
if (i % 2 == 0) {
|
||||
volumeABucketsPrefixWithOzoneOwner.add(
|
||||
prefixBucketNameWithOzoneOwner + i);
|
||||
addBucketsToCache(volumeName1, prefixBucketNameWithOzoneOwner + i);
|
||||
} else {
|
||||
volumeABucketsPrefixWithHadoopOwner.add(
|
||||
prefixBucketNameWithHadoopOwner + i);
|
||||
addBucketsToCache(volumeName1, prefixBucketNameWithHadoopOwner + i);
|
||||
}
|
||||
}
|
||||
|
||||
String volumeName2 = "volumeB";
|
||||
TreeSet<String> volumeBBucketsPrefixWithOzoneOwner = new TreeSet<>();
|
||||
TreeSet<String> volumeBBucketsPrefixWithHadoopOwner = new TreeSet<>();
|
||||
TestOMRequestUtils.addVolumeToDB(volumeName2, omMetadataManager);
|
||||
for (int i=1; i<= 100; i++) {
|
||||
if (i % 2 == 0) {
|
||||
volumeBBucketsPrefixWithOzoneOwner.add(
|
||||
prefixBucketNameWithOzoneOwner + i);
|
||||
addBucketsToCache(volumeName2, prefixBucketNameWithOzoneOwner + i);
|
||||
} else {
|
||||
volumeBBucketsPrefixWithHadoopOwner.add(
|
||||
prefixBucketNameWithHadoopOwner + i);
|
||||
addBucketsToCache(volumeName2, prefixBucketNameWithHadoopOwner + i);
|
||||
}
|
||||
}
|
||||
|
||||
// List all buckets which have prefix ozoneBucket
|
||||
List<OmBucketInfo> omBucketInfoList =
|
||||
omMetadataManager.listBuckets(volumeName1,
|
||||
null, prefixBucketNameWithOzoneOwner, 100);
|
||||
|
||||
Assert.assertEquals(omBucketInfoList.size(), 50);
|
||||
|
||||
for (OmBucketInfo omBucketInfo : omBucketInfoList) {
|
||||
Assert.assertTrue(omBucketInfo.getBucketName().startsWith(
|
||||
prefixBucketNameWithOzoneOwner));
|
||||
}
|
||||
|
||||
|
||||
String startBucket = prefixBucketNameWithOzoneOwner + 10;
|
||||
omBucketInfoList =
|
||||
omMetadataManager.listBuckets(volumeName1,
|
||||
startBucket, prefixBucketNameWithOzoneOwner,
|
||||
100);
|
||||
|
||||
Assert.assertEquals(volumeABucketsPrefixWithOzoneOwner.tailSet(
|
||||
startBucket).size() - 1, omBucketInfoList.size());
|
||||
|
||||
startBucket = prefixBucketNameWithOzoneOwner + 38;
|
||||
omBucketInfoList =
|
||||
omMetadataManager.listBuckets(volumeName1,
|
||||
startBucket, prefixBucketNameWithOzoneOwner,
|
||||
100);
|
||||
|
||||
Assert.assertEquals(volumeABucketsPrefixWithOzoneOwner.tailSet(
|
||||
startBucket).size() - 1, omBucketInfoList.size());
|
||||
|
||||
for (OmBucketInfo omBucketInfo : omBucketInfoList) {
|
||||
Assert.assertTrue(omBucketInfo.getBucketName().startsWith(
|
||||
prefixBucketNameWithOzoneOwner));
|
||||
Assert.assertFalse(omBucketInfo.getBucketName().equals(
|
||||
prefixBucketNameWithOzoneOwner + 10));
|
||||
}
|
||||
|
||||
|
||||
|
||||
omBucketInfoList = omMetadataManager.listBuckets(volumeName2,
|
||||
null, prefixBucketNameWithHadoopOwner, 100);
|
||||
|
||||
Assert.assertEquals(omBucketInfoList.size(), 50);
|
||||
|
||||
for (OmBucketInfo omBucketInfo : omBucketInfoList) {
|
||||
Assert.assertTrue(omBucketInfo.getBucketName().startsWith(
|
||||
prefixBucketNameWithHadoopOwner));
|
||||
}
|
||||
|
||||
// Try to get buckets by count 10, like that get all buckets in the
|
||||
// volumeB with prefixBucketNameWithHadoopOwner.
|
||||
startBucket = null;
|
||||
TreeSet<String> expectedBuckets = new TreeSet<>();
|
||||
for (int i=0; i<5; i++) {
|
||||
|
||||
omBucketInfoList = omMetadataManager.listBuckets(volumeName2,
|
||||
startBucket, prefixBucketNameWithHadoopOwner, 10);
|
||||
|
||||
Assert.assertEquals(omBucketInfoList.size(), 10);
|
||||
|
||||
for (OmBucketInfo omBucketInfo : omBucketInfoList) {
|
||||
expectedBuckets.add(omBucketInfo.getBucketName());
|
||||
Assert.assertTrue(omBucketInfo.getBucketName().startsWith(
|
||||
prefixBucketNameWithHadoopOwner));
|
||||
startBucket = omBucketInfo.getBucketName();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Assert.assertEquals(volumeBBucketsPrefixWithHadoopOwner, expectedBuckets);
|
||||
// As now we have iterated all 50 buckets, calling next time should
|
||||
// return empty list.
|
||||
omBucketInfoList = omMetadataManager.listBuckets(volumeName2,
|
||||
startBucket, prefixBucketNameWithHadoopOwner, 10);
|
||||
|
||||
Assert.assertEquals(omBucketInfoList.size(), 0);
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void addBucketsToCache(String volumeName, String bucketName) {
|
||||
|
||||
OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setStorageType(StorageType.DISK)
|
||||
.setIsVersionEnabled(false)
|
||||
.build();
|
||||
|
||||
omMetadataManager.getBucketTable().addCacheEntry(
|
||||
new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
|
||||
new CacheValue<>(Optional.of(omBucketInfo), 1));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user