HDFS-12506. Ozone: ListBucket is too slow. Contributed by Weiwei Yang.
This commit is contained in:
parent
bf6f0cd831
commit
fd1564b87e
@ -98,10 +98,29 @@ public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* KSM LevelDB prefixes.
|
* KSM LevelDB prefixes.
|
||||||
|
*
|
||||||
|
* KSM DB stores metadata as KV pairs with certain prefixes,
|
||||||
|
* prefix is used to improve the performance to get related
|
||||||
|
* metadata.
|
||||||
|
*
|
||||||
|
* KSM DB Schema:
|
||||||
|
* ----------------------------------------------------------
|
||||||
|
* | KEY | VALUE |
|
||||||
|
* ----------------------------------------------------------
|
||||||
|
* | $userName | VolumeList |
|
||||||
|
* ----------------------------------------------------------
|
||||||
|
* | /#volumeName | VolumeInfo |
|
||||||
|
* ----------------------------------------------------------
|
||||||
|
* | /#volumeName/#bucketName | BucketInfo |
|
||||||
|
* ----------------------------------------------------------
|
||||||
|
* | /volumeName/bucketName/keyName | KeyInfo |
|
||||||
|
* ----------------------------------------------------------
|
||||||
|
* | #deleting#/volumeName/bucketName/keyName | KeyInfo |
|
||||||
|
* ----------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
public static final String KSM_VOLUME_PREFIX = "/";
|
public static final String KSM_VOLUME_PREFIX = "/#";
|
||||||
public static final String KSM_BUCKET_PREFIX = KSM_VOLUME_PREFIX;
|
public static final String KSM_BUCKET_PREFIX = "/#";
|
||||||
public static final String KSM_KEY_PREFIX = KSM_VOLUME_PREFIX;
|
public static final String KSM_KEY_PREFIX = "/";
|
||||||
public static final String KSM_USER_PREFIX = "$";
|
public static final String KSM_USER_PREFIX = "$";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -21,7 +21,10 @@
|
|||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.*;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
@ -146,16 +149,16 @@ private String getBucketKeyPrefix(String volume, String bucket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private String getKeyKeyPrefix(String volume, String bucket, String key) {
|
private String getKeyKeyPrefix(String volume, String bucket, String key) {
|
||||||
String keyStr = getBucketKeyPrefix(volume, bucket);
|
String keyVB = OzoneConsts.KSM_KEY_PREFIX + volume
|
||||||
keyStr = Strings.isNullOrEmpty(key) ? keyStr + OzoneConsts.KSM_KEY_PREFIX
|
+ OzoneConsts.KSM_KEY_PREFIX + bucket
|
||||||
: keyStr + OzoneConsts.KSM_KEY_PREFIX + key;
|
+ OzoneConsts.KSM_KEY_PREFIX;
|
||||||
return keyStr;
|
return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
||||||
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
String keyKeyString = OzoneConsts.KSM_KEY_PREFIX + volume
|
||||||
+ OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
|
+ OzoneConsts.KSM_KEY_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
|
||||||
+ key;
|
+ key;
|
||||||
return DFSUtil.string2Bytes(keyKeyString);
|
return DFSUtil.string2Bytes(keyKeyString);
|
||||||
}
|
}
|
||||||
@ -223,15 +226,14 @@ public void writeBatch(BatchOperation batch) throws IOException {
|
|||||||
* @return true if the volume is empty
|
* @return true if the volume is empty
|
||||||
*/
|
*/
|
||||||
public boolean isVolumeEmpty(String volume) throws IOException {
|
public boolean isVolumeEmpty(String volume) throws IOException {
|
||||||
String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
|
String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
||||||
|
+ OzoneConsts.KSM_BUCKET_PREFIX;
|
||||||
byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
|
byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
|
||||||
// Seek to the root of the volume and look for the next key
|
|
||||||
ImmutablePair<byte[], byte[]> volumeRoot =
|
ImmutablePair<byte[], byte[]> volumeRoot =
|
||||||
store.peekAround(1, dbVolumeRootKey);
|
store.peekAround(0, dbVolumeRootKey);
|
||||||
if (volumeRoot != null) {
|
if (volumeRoot != null) {
|
||||||
String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey());
|
return !DFSUtil.bytes2String(volumeRoot.getKey())
|
||||||
return !firstBucketKey.startsWith(dbVolumeRootName
|
.startsWith(dbVolumeRootName);
|
||||||
+ OzoneConsts.KSM_BUCKET_PREFIX);
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -245,13 +247,13 @@ public boolean isVolumeEmpty(String volume) throws IOException {
|
|||||||
*/
|
*/
|
||||||
public boolean isBucketEmpty(String volume, String bucket)
|
public boolean isBucketEmpty(String volume, String bucket)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
String keyRootName = OzoneConsts.KSM_KEY_PREFIX + volume
|
||||||
+ OzoneConsts.KSM_BUCKET_PREFIX + bucket;
|
+ OzoneConsts.KSM_KEY_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX;
|
||||||
byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
|
byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
|
||||||
ImmutablePair<byte[], byte[]> firstKey = store.peekAround(1, keyRoot);
|
ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot);
|
||||||
if (firstKey != null) {
|
if (firstKey != null) {
|
||||||
return !DFSUtil.bytes2String(firstKey.getKey())
|
return !DFSUtil.bytes2String(firstKey.getKey())
|
||||||
.startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX);
|
.startsWith(keyRootName);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -276,30 +278,27 @@ public List<KsmBucketInfo> listBuckets(final String volumeName,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// A bucket must start with /volume/bucket_prefix
|
// A bucket starts with /#volume/#bucket_prefix
|
||||||
// and exclude keys /volume/bucket_xxx/key_xxx
|
|
||||||
MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
|
MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
|
||||||
if (currentKey != null) {
|
if (currentKey != null) {
|
||||||
String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
|
String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
|
||||||
String bucket = DFSUtil.bytes2String(currentKey);
|
String bucket = DFSUtil.bytes2String(currentKey);
|
||||||
return bucket.startsWith(bucketNamePrefix) &&
|
return bucket.startsWith(bucketNamePrefix);
|
||||||
!bucket.replaceFirst(bucketNamePrefix, "")
|
|
||||||
.contains(OzoneConsts.KSM_KEY_PREFIX);
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
List<Map.Entry<byte[], byte[]>> rangeResult;
|
List<Map.Entry<byte[], byte[]>> rangeResult;
|
||||||
if (!Strings.isNullOrEmpty(startBucket)) {
|
if (!Strings.isNullOrEmpty(startBucket)) {
|
||||||
//Since we are excluding start key from the result,
|
// Since we are excluding start key from the result,
|
||||||
// the maxNumOfBuckets is incremented.
|
// the maxNumOfBuckets is incremented.
|
||||||
rangeResult = store.getRangeKVs(
|
rangeResult = store.getSequentialRangeKVs(
|
||||||
getBucketKey(volumeName, startBucket),
|
getBucketKey(volumeName, startBucket),
|
||||||
maxNumOfBuckets + 1, filter);
|
maxNumOfBuckets + 1, filter);
|
||||||
//Remove start key from result.
|
//Remove start key from result.
|
||||||
rangeResult.remove(0);
|
rangeResult.remove(0);
|
||||||
} else {
|
} else {
|
||||||
rangeResult = store.getRangeKVs(null, maxNumOfBuckets, filter);
|
rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
|
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
|
||||||
|
@ -63,6 +63,7 @@
|
|||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.KSM_USER_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.KSM_USER_PREFIX;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.KSM_BUCKET_PREFIX;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.KSM_VOLUME_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.KSM_VOLUME_PREFIX;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
|
import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
|
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
|
||||||
@ -385,7 +386,7 @@ private void convertKSMDB(Path dbPath, Path outPath) throws Exception {
|
|||||||
try {
|
try {
|
||||||
insertKSMDB(conn, type, keyString, value);
|
insertKSMDB(conn, type, keyString, value);
|
||||||
} catch (IOException | SQLException ex) {
|
} catch (IOException | SQLException ex) {
|
||||||
LOG.error("Exception inserting key {}", keyString, ex);
|
LOG.error("Exception inserting key {} type {}", keyString, type, ex);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
@ -445,18 +446,11 @@ private void insertKSMDB(Connection conn, KeyType type, String keyName,
|
|||||||
private KeyType getKeyType(String key) {
|
private KeyType getKeyType(String key) {
|
||||||
if (key.startsWith(KSM_USER_PREFIX)) {
|
if (key.startsWith(KSM_USER_PREFIX)) {
|
||||||
return KeyType.USER;
|
return KeyType.USER;
|
||||||
} else {
|
} else if (key.startsWith(KSM_VOLUME_PREFIX)) {
|
||||||
int count = key.length() - key.replace(KSM_VOLUME_PREFIX, "").length();
|
return key.replaceFirst(KSM_VOLUME_PREFIX, "")
|
||||||
// NOTE : when delimiter gets changed, will need to change this part
|
.contains(KSM_BUCKET_PREFIX) ? KeyType.BUCKET : KeyType.VOLUME;
|
||||||
if (count == 1) {
|
}else {
|
||||||
return KeyType.VOLUME;
|
return KeyType.KEY;
|
||||||
} else if (count == 2) {
|
|
||||||
return KeyType.BUCKET;
|
|
||||||
} else if (count >= 3) {
|
|
||||||
return KeyType.KEY;
|
|
||||||
} else {
|
|
||||||
return KeyType.UNKNOWN;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -179,7 +180,7 @@ public ImmutablePair<byte[], byte[]> peekAround(int offset,
|
|||||||
it.seek(from);
|
it.seek(from);
|
||||||
}
|
}
|
||||||
if (!it.hasNext()) {
|
if (!it.hasNext()) {
|
||||||
throw new IOException("Key not found");
|
return null;
|
||||||
}
|
}
|
||||||
switch (offset) {
|
switch (offset) {
|
||||||
case 0:
|
case 0:
|
||||||
@ -260,6 +261,20 @@ public void writeBatch(BatchOperation operation) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||||
|
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
|
||||||
|
throws IOException, IllegalArgumentException {
|
||||||
|
return getRangeKVs(startKey, count, false, filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
|
||||||
|
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
|
||||||
|
throws IOException, IllegalArgumentException {
|
||||||
|
return getRangeKVs(startKey, count, true, filters);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a certain range of key value pairs as a list based on a
|
* Returns a certain range of key value pairs as a list based on a
|
||||||
* startKey or count. Further a {@link MetadataKeyFilter} can be added to
|
* startKey or count. Further a {@link MetadataKeyFilter} can be added to
|
||||||
@ -287,9 +302,9 @@ public void writeBatch(BatchOperation operation) throws IOException {
|
|||||||
* @throws IOException if an invalid startKey is given or other I/O errors.
|
* @throws IOException if an invalid startKey is given or other I/O errors.
|
||||||
* @throws IllegalArgumentException if count is less than 0.
|
* @throws IllegalArgumentException if count is less than 0.
|
||||||
*/
|
*/
|
||||||
@Override
|
private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||||
public List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
int count, boolean sequential, MetadataKeyFilter... filters)
|
||||||
int count, MetadataKeyFilter... filters) throws IOException {
|
throws IOException {
|
||||||
List<Entry<byte[], byte[]>> result = new ArrayList<>();
|
List<Entry<byte[], byte[]>> result = new ArrayList<>();
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
if (count < 0) {
|
if (count < 0) {
|
||||||
@ -314,10 +329,21 @@ public List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
|||||||
byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null;
|
byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null;
|
||||||
byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null;
|
byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null;
|
||||||
Entry<byte[], byte[]> current = dbIter.next();
|
Entry<byte[], byte[]> current = dbIter.next();
|
||||||
if (filters == null || Arrays.asList(filters).stream()
|
|
||||||
.allMatch(entry -> entry.filterKey(preKey,
|
if (filters == null) {
|
||||||
current.getKey(), nextKey))) {
|
|
||||||
result.add(current);
|
result.add(current);
|
||||||
|
} else {
|
||||||
|
if (Arrays.asList(filters).stream().allMatch(
|
||||||
|
entry -> entry.filterKey(preKey, current.getKey(), nextKey))) {
|
||||||
|
result.add(current);
|
||||||
|
} else {
|
||||||
|
if (result.size() > 0 && sequential) {
|
||||||
|
// if the caller asks for a sequential range of results,
|
||||||
|
// and we met a dis-match, abort iteration from here.
|
||||||
|
// if result is empty, we continue to look for the first match.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -98,6 +98,27 @@ List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
|||||||
int count, MetadataKeyFilter... filters)
|
int count, MetadataKeyFilter... filters)
|
||||||
throws IOException, IllegalArgumentException;
|
throws IOException, IllegalArgumentException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is very similar with
|
||||||
|
* {@link #getRangeKVs(byte[], int, MetadataKeyFilter...)}, the only
|
||||||
|
* different is this method is supposed to return a sequential range
|
||||||
|
* of elements based on the filters. While iterating the elements,
|
||||||
|
* if it met any entry that cannot pass the filter, the iterator will stop
|
||||||
|
* from this point without looking for next match. If no filter is given,
|
||||||
|
* this method behaves just like
|
||||||
|
* {@link #getRangeKVs(byte[], int, MetadataKeyFilter...)}.
|
||||||
|
*
|
||||||
|
* @param startKey a start key.
|
||||||
|
* @param count max number of entries to return.
|
||||||
|
* @param filters customized one or more {@link MetadataKeyFilter}.
|
||||||
|
* @return a list of entries found in the database.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws IllegalArgumentException
|
||||||
|
*/
|
||||||
|
List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
|
||||||
|
int count, MetadataKeyFilter... filters)
|
||||||
|
throws IOException, IllegalArgumentException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A batch of PUT, DELETE operations handled as a single atomic write.
|
* A batch of PUT, DELETE operations handled as a single atomic write.
|
||||||
*
|
*
|
||||||
|
@ -133,6 +133,20 @@ public void delete(byte[] key) throws IOException {
|
|||||||
public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||||
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
|
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
|
||||||
throws IOException, IllegalArgumentException {
|
throws IOException, IllegalArgumentException {
|
||||||
|
return getRangeKVs(startKey, count, false, filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
|
||||||
|
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
|
||||||
|
throws IOException, IllegalArgumentException {
|
||||||
|
return getRangeKVs(startKey, count, true, filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||||
|
int count, boolean sequential,
|
||||||
|
MetadataKeyFilters.MetadataKeyFilter... filters)
|
||||||
|
throws IOException, IllegalArgumentException {
|
||||||
List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
|
List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
if (count < 0) {
|
if (count < 0) {
|
||||||
@ -161,11 +175,23 @@ public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
|||||||
it.next();
|
it.next();
|
||||||
final byte[] nextKey = it.isValid() ? it.key() : null;
|
final byte[] nextKey = it.isValid() ? it.key() : null;
|
||||||
|
|
||||||
if (filters == null || Arrays.asList(filters).stream()
|
if (filters == null) {
|
||||||
.allMatch(entry -> entry.filterKey(prevKey,
|
|
||||||
currentKey, nextKey))) {
|
|
||||||
result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
|
result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
|
||||||
currentValue));
|
currentValue));
|
||||||
|
} else {
|
||||||
|
if (Arrays.asList(filters).stream()
|
||||||
|
.allMatch(entry -> entry.filterKey(prevKey,
|
||||||
|
currentKey, nextKey))) {
|
||||||
|
result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
|
||||||
|
currentValue));
|
||||||
|
} else {
|
||||||
|
if (result.size() > 0 && sequential) {
|
||||||
|
// if the caller asks for a sequential range of results,
|
||||||
|
// and we met a dis-match, abort iteration from here.
|
||||||
|
// if result is empty, we continue to look for the first match.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -261,7 +287,7 @@ public ImmutablePair<byte[], byte[]> peekAround(int offset,
|
|||||||
it.seek(from);
|
it.seek(from);
|
||||||
}
|
}
|
||||||
if (!it.isValid()) {
|
if (!it.isValid()) {
|
||||||
throw new IOException("Key not found");
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (offset) {
|
switch (offset) {
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.utils.BatchOperation;
|
import org.apache.hadoop.utils.BatchOperation;
|
||||||
import org.apache.hadoop.utils.MetadataStore;
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
@ -293,6 +294,25 @@ public void testGetRangeKVs() throws IOException {
|
|||||||
Assert.assertEquals("a0", getString(result.get(0).getKey()));
|
Assert.assertEquals("a0", getString(result.get(0).getKey()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSequentialRangeKVs() throws IOException {
|
||||||
|
MetadataKeyFilter suffixFilter = (preKey, currentKey, nextKey)
|
||||||
|
-> DFSUtil.bytes2String(currentKey).endsWith("2");
|
||||||
|
// Suppose to return a2 and b2
|
||||||
|
List<Map.Entry<byte[], byte[]>> result =
|
||||||
|
store.getRangeKVs(null, MAX_GETRANGE_LENGTH, suffixFilter);
|
||||||
|
Assert.assertEquals(2, result.size());
|
||||||
|
Assert.assertEquals("a2", DFSUtil.bytes2String(result.get(0).getKey()));
|
||||||
|
Assert.assertEquals("b2", DFSUtil.bytes2String(result.get(1).getKey()));
|
||||||
|
|
||||||
|
// Suppose to return just a2, because when it iterates to a3,
|
||||||
|
// the filter no long matches and it should stop from there.
|
||||||
|
result = store.getSequentialRangeKVs(null,
|
||||||
|
MAX_GETRANGE_LENGTH, suffixFilter);
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
Assert.assertEquals("a2", DFSUtil.bytes2String(result.get(0).getKey()));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetRangeLength() throws IOException {
|
public void testGetRangeLength() throws IOException {
|
||||||
List<Map.Entry<byte[], byte[]>> result = null;
|
List<Map.Entry<byte[], byte[]>> result = null;
|
||||||
|
@ -80,9 +80,9 @@ private KSMMetadataManager getMetadataManagerMock(String... volumesToCreate)
|
|||||||
@Override
|
@Override
|
||||||
public Boolean answer(InvocationOnMock invocation)
|
public Boolean answer(InvocationOnMock invocation)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX
|
String keyRootName = OzoneConsts.KSM_KEY_PREFIX
|
||||||
+ invocation.getArguments()[0]
|
+ invocation.getArguments()[0]
|
||||||
+ OzoneConsts.KSM_BUCKET_PREFIX
|
+ OzoneConsts.KSM_KEY_PREFIX
|
||||||
+ invocation.getArguments()[1]
|
+ invocation.getArguments()[1]
|
||||||
+ OzoneConsts.KSM_KEY_PREFIX;
|
+ OzoneConsts.KSM_KEY_PREFIX;
|
||||||
Iterator<String> keyIterator = metadataDB.keySet().iterator();
|
Iterator<String> keyIterator = metadataDB.keySet().iterator();
|
||||||
|
@ -124,6 +124,19 @@ static void runTestCreateBucket(OzoneRestClient client)
|
|||||||
assertEquals(vol.getOwnerName(), "bilbo");
|
assertEquals(vol.getOwnerName(), "bilbo");
|
||||||
assertEquals(vol.getQuota().getUnit(), OzoneQuota.Units.TB);
|
assertEquals(vol.getQuota().getUnit(), OzoneQuota.Units.TB);
|
||||||
assertEquals(vol.getQuota().getSize(), 100);
|
assertEquals(vol.getQuota().getSize(), 100);
|
||||||
|
|
||||||
|
// Test create a bucket with invalid bucket name,
|
||||||
|
// not use Rule here because the test method is static.
|
||||||
|
try {
|
||||||
|
String invalidBucketName = "#" + OzoneUtils.getRequestID().toLowerCase();
|
||||||
|
vol.createBucket(invalidBucketName, acls, StorageType.DEFAULT);
|
||||||
|
fail("Except the bucket creation to be failed because the"
|
||||||
|
+ " bucket name starts with an invalid char #");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e instanceof OzoneRestClientException);
|
||||||
|
assertTrue(e.getMessage().contains("Bucket or Volume name"
|
||||||
|
+ " has an unsupported character : #"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -37,7 +37,6 @@
|
|||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.*;
|
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -47,6 +46,11 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
@ -125,6 +129,20 @@ static void runTestCreateVolume(OzoneRestClient client)
|
|||||||
// verify the key creation time
|
// verify the key creation time
|
||||||
assertTrue((OzoneUtils.formatDate(vol.getCreatedOn())
|
assertTrue((OzoneUtils.formatDate(vol.getCreatedOn())
|
||||||
/ 1000) >= (currentTime / 1000));
|
/ 1000) >= (currentTime / 1000));
|
||||||
|
|
||||||
|
// Test create a volume with invalid volume name,
|
||||||
|
// not use Rule here because the test method is static.
|
||||||
|
try {
|
||||||
|
String invalidVolumeName = "#" + OzoneUtils.getRequestID().toLowerCase();
|
||||||
|
client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER);
|
||||||
|
mockClient.createVolume(invalidVolumeName, "bilbo", "100TB");
|
||||||
|
fail("Except the volume creation be failed because the"
|
||||||
|
+ " volume name starts with an invalid char #");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e instanceof OzoneRestClientException);
|
||||||
|
assertTrue(e.getMessage().contains("Bucket or Volume name"
|
||||||
|
+ " has an unsupported character : #"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -239,7 +257,7 @@ static void runTestListVolumePagination(OzoneRestClient client)
|
|||||||
prevKey = ovols.get(ovols.size() - 1);
|
prevKey = ovols.get(ovols.size() - 1);
|
||||||
pagecount++;
|
pagecount++;
|
||||||
}
|
}
|
||||||
Assert.assertEquals(volCount / step, pagecount);
|
assertEquals(volCount / step, pagecount);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove @Ignore below once the problem has been resolved.
|
// TODO: remove @Ignore below once the problem has been resolved.
|
||||||
@ -275,7 +293,7 @@ static void runTestListAllVolumes(OzoneRestClient client)
|
|||||||
}
|
}
|
||||||
// becasue we are querying an existing ozone store, there will
|
// becasue we are querying an existing ozone store, there will
|
||||||
// be volumes created by other tests too. So we should get more page counts.
|
// be volumes created by other tests too. So we should get more page counts.
|
||||||
Assert.assertEquals(volCount / step, pagecount);
|
assertEquals(volCount / step, pagecount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -382,37 +400,32 @@ private static List<CloseableHttpClient> mockHttpClients(
|
|||||||
private static void verifyHttpConnectionClosed(
|
private static void verifyHttpConnectionClosed(
|
||||||
List<CloseableHttpClient> mockedHttpClients) {
|
List<CloseableHttpClient> mockedHttpClients) {
|
||||||
final AtomicInteger totalCalled = new AtomicInteger();
|
final AtomicInteger totalCalled = new AtomicInteger();
|
||||||
Assert.assertTrue(mockedHttpClients.stream().allMatch(
|
assertTrue(mockedHttpClients.stream().allMatch(closeableHttpClient -> {
|
||||||
closeableHttpClient -> {
|
boolean clientUsed = false;
|
||||||
boolean clientUsed = false;
|
try {
|
||||||
try {
|
verify(closeableHttpClient, times(1)).execute(Mockito.any());
|
||||||
verify(closeableHttpClient, times(1))
|
totalCalled.incrementAndGet();
|
||||||
.execute(Mockito.any());
|
clientUsed = true;
|
||||||
totalCalled.incrementAndGet();
|
} catch (Throwable e) {
|
||||||
clientUsed = true;
|
// There might be some redundant instances in mockedHttpClients,
|
||||||
} catch (Throwable e) {
|
// it is allowed that a client is not used.
|
||||||
// There might be some redundant instances in mockedHttpClients,
|
return true;
|
||||||
// it is allowed that a client is not used.
|
}
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (clientUsed) {
|
if (clientUsed) {
|
||||||
try {
|
try {
|
||||||
// If a client is used, ensure the close function is called.
|
// If a client is used, ensure the close function is called.
|
||||||
verify(closeableHttpClient,
|
verify(closeableHttpClient, times(1)).close();
|
||||||
times(1)).close();
|
return true;
|
||||||
return true;
|
} catch (IOException e) {
|
||||||
} catch (IOException e) {
|
return false;
|
||||||
return false;
|
}
|
||||||
}
|
} else {
|
||||||
} else {
|
return true;
|
||||||
return true;
|
}
|
||||||
}
|
}));
|
||||||
}));
|
System.out.println("Successful connections " + totalCalled.get());
|
||||||
System.out.println("Successful connections "
|
assertTrue("The mocked http client should be called at least once.",
|
||||||
+ totalCalled.get());
|
|
||||||
Assert.assertTrue(
|
|
||||||
"The mocked http client should be called at least once.",
|
|
||||||
totalCalled.get() > 0);
|
totalCalled.get() > 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user