HDDS-1499. OzoneManager Cache. (#798)
This commit is contained in:
parent
a36274d699
commit
0d1d7c86ec
@ -44,6 +44,7 @@ public interface DBStore extends AutoCloseable {
|
|||||||
*/
|
*/
|
||||||
Table<byte[], byte[]> getTable(String name) throws IOException;
|
Table<byte[], byte[]> getTable(String name) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets an existing TableStore with implicit key/value conversion.
|
* Gets an existing TableStore with implicit key/value conversion.
|
||||||
*
|
*
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
|
||||||
import org.rocksdb.ColumnFamilyHandle;
|
import org.rocksdb.ColumnFamilyHandle;
|
||||||
@ -33,9 +34,12 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RocksDB implementation of ozone metadata store.
|
* RocksDB implementation of ozone metadata store. This class should be only
|
||||||
|
* used as part of TypedTable as it's underlying implementation to access the
|
||||||
|
* metadata store content. All other user's using Table should use TypedTable.
|
||||||
*/
|
*/
|
||||||
public class RDBTable implements Table<byte[], byte[]> {
|
@InterfaceAudience.Private
|
||||||
|
class RDBTable implements Table<byte[], byte[]> {
|
||||||
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
@ -52,7 +56,7 @@ public class RDBTable implements Table<byte[], byte[]> {
|
|||||||
* @param handle - ColumnFamily Handle.
|
* @param handle - ColumnFamily Handle.
|
||||||
* @param writeOptions - RocksDB write Options.
|
* @param writeOptions - RocksDB write Options.
|
||||||
*/
|
*/
|
||||||
public RDBTable(RocksDB db, ColumnFamilyHandle handle,
|
RDBTable(RocksDB db, ColumnFamilyHandle handle,
|
||||||
WriteOptions writeOptions) {
|
WriteOptions writeOptions) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
this.handle = handle;
|
this.handle = handle;
|
||||||
|
@ -21,8 +21,10 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheKey;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheValue;
|
||||||
/**
|
/**
|
||||||
* Interface for key-value store that stores ozone metadata. Ozone metadata is
|
* Interface for key-value store that stores ozone metadata. Ozone metadata is
|
||||||
* stored as key value pairs, both key and value are arbitrary byte arrays. Each
|
* stored as key value pairs, both key and value are arbitrary byte arrays. Each
|
||||||
@ -97,6 +99,28 @@ void putWithBatch(BatchOperation batch, KEY key, VALUE value)
|
|||||||
*/
|
*/
|
||||||
String getName() throws IOException;
|
String getName() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add entry to the table cache.
|
||||||
|
*
|
||||||
|
* If the cacheKey already exists, it will override the entry.
|
||||||
|
* @param cacheKey
|
||||||
|
* @param cacheValue
|
||||||
|
*/
|
||||||
|
default void addCacheEntry(CacheKey<KEY> cacheKey,
|
||||||
|
CacheValue<VALUE> cacheValue) {
|
||||||
|
throw new NotImplementedException("addCacheEntry is not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes all the entries from the table cache which are having epoch value
|
||||||
|
* less
|
||||||
|
* than or equal to specified epoch value.
|
||||||
|
* @param epoch
|
||||||
|
*/
|
||||||
|
default void cleanupCache(long epoch) {
|
||||||
|
throw new NotImplementedException("cleanupCache is not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class used to represent the key and value pair of a db entry.
|
* Class used to represent the key and value pair of a db entry.
|
||||||
*/
|
*/
|
||||||
|
@ -20,6 +20,12 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheKey;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheValue;
|
||||||
|
import org.apache.hadoop.utils.db.cache.PartialTableCache;
|
||||||
|
import org.apache.hadoop.utils.db.cache.TableCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Strongly typed table implementation.
|
* Strongly typed table implementation.
|
||||||
* <p>
|
* <p>
|
||||||
@ -31,13 +37,16 @@
|
|||||||
*/
|
*/
|
||||||
public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
|
public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
|
||||||
|
|
||||||
private Table<byte[], byte[]> rawTable;
|
private final Table<byte[], byte[]> rawTable;
|
||||||
|
|
||||||
private CodecRegistry codecRegistry;
|
private final CodecRegistry codecRegistry;
|
||||||
|
|
||||||
private Class<KEY> keyType;
|
private final Class<KEY> keyType;
|
||||||
|
|
||||||
|
private final Class<VALUE> valueType;
|
||||||
|
|
||||||
|
private final TableCache<CacheKey<KEY>, CacheValue<VALUE>> cache;
|
||||||
|
|
||||||
private Class<VALUE> valueType;
|
|
||||||
|
|
||||||
public TypedTable(
|
public TypedTable(
|
||||||
Table<byte[], byte[]> rawTable,
|
Table<byte[], byte[]> rawTable,
|
||||||
@ -47,6 +56,7 @@ public TypedTable(
|
|||||||
this.codecRegistry = codecRegistry;
|
this.codecRegistry = codecRegistry;
|
||||||
this.keyType = keyType;
|
this.keyType = keyType;
|
||||||
this.valueType = valueType;
|
this.valueType = valueType;
|
||||||
|
cache = new PartialTableCache<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -69,8 +79,34 @@ public boolean isEmpty() throws IOException {
|
|||||||
return rawTable.isEmpty();
|
return rawTable.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value mapped to the given key in byte array or returns null
|
||||||
|
* if the key is not found.
|
||||||
|
*
|
||||||
|
* Caller's of this method should use synchronization mechanism, when
|
||||||
|
* accessing. First it will check from cache, if it has entry return the
|
||||||
|
* value, otherwise get from the RocksDB table.
|
||||||
|
*
|
||||||
|
* @param key metadata key
|
||||||
|
* @return VALUE
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public VALUE get(KEY key) throws IOException {
|
public VALUE get(KEY key) throws IOException {
|
||||||
|
// Here the metadata lock will guarantee that cache is not updated for same
|
||||||
|
// key during get key.
|
||||||
|
CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key));
|
||||||
|
if (cacheValue == null) {
|
||||||
|
// If no cache for the table or if it does not exist in cache get from
|
||||||
|
// RocksDB table.
|
||||||
|
return getFromTable(key);
|
||||||
|
} else {
|
||||||
|
// We have a value in cache, return the value.
|
||||||
|
return cacheValue.getValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private VALUE getFromTable(KEY key) throws IOException {
|
||||||
byte[] keyBytes = codecRegistry.asRawData(key);
|
byte[] keyBytes = codecRegistry.asRawData(key);
|
||||||
byte[] valueBytes = rawTable.get(keyBytes);
|
byte[] valueBytes = rawTable.get(keyBytes);
|
||||||
return codecRegistry.asObject(valueBytes, valueType);
|
return codecRegistry.asObject(valueBytes, valueType);
|
||||||
@ -106,6 +142,40 @@ public void close() throws Exception {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addCacheEntry(CacheKey<KEY> cacheKey,
|
||||||
|
CacheValue<VALUE> cacheValue) {
|
||||||
|
// This will override the entry if there is already entry for this key.
|
||||||
|
cache.put(cacheKey, cacheValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanupCache(long epoch) {
|
||||||
|
cache.cleanup(epoch);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
TableCache<CacheKey<KEY>, CacheValue<VALUE>> getCache() {
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Table<byte[], byte[]> getRawTable() {
|
||||||
|
return rawTable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CodecRegistry getCodecRegistry() {
|
||||||
|
return codecRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class<KEY> getKeyType() {
|
||||||
|
return keyType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class<VALUE> getValueType() {
|
||||||
|
return valueType;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Key value implementation for strongly typed tables.
|
* Key value implementation for strongly typed tables.
|
||||||
*/
|
*/
|
||||||
|
56
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java
vendored
Normal file
56
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java
vendored
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CacheKey for the RocksDB table.
|
||||||
|
* @param <KEY>
|
||||||
|
*/
|
||||||
|
public class CacheKey<KEY> {
|
||||||
|
|
||||||
|
private final KEY key;
|
||||||
|
|
||||||
|
public CacheKey(KEY key) {
|
||||||
|
Objects.requireNonNull(key, "Key Should not be null in CacheKey");
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public KEY getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
CacheKey<?> cacheKey = (CacheKey<?>) o;
|
||||||
|
return Objects.equals(key, cacheKey.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(key);
|
||||||
|
}
|
||||||
|
}
|
47
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java
vendored
Normal file
47
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java
vendored
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CacheValue for the RocksDB Table.
|
||||||
|
* @param <VALUE>
|
||||||
|
*/
|
||||||
|
public class CacheValue<VALUE> {
|
||||||
|
|
||||||
|
private Optional<VALUE> value;
|
||||||
|
// This value is used for evict entries from cache.
|
||||||
|
// This value is set with ratis transaction context log entry index.
|
||||||
|
private long epoch;
|
||||||
|
|
||||||
|
public CacheValue(Optional<VALUE> value, long epoch) {
|
||||||
|
this.value = value;
|
||||||
|
this.epoch = epoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public VALUE getValue() {
|
||||||
|
return value.orNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEpoch() {
|
||||||
|
return epoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
74
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java
vendored
Normal file
74
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java
vendored
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class used which describes epoch entry. This will be used during deletion
|
||||||
|
* entries from cache for partial table cache.
|
||||||
|
* @param <CACHEKEY>
|
||||||
|
*/
|
||||||
|
public class EpochEntry<CACHEKEY> implements Comparable<CACHEKEY> {
|
||||||
|
|
||||||
|
private long epoch;
|
||||||
|
private CACHEKEY cachekey;
|
||||||
|
|
||||||
|
EpochEntry(long epoch, CACHEKEY cachekey) {
|
||||||
|
this.epoch = epoch;
|
||||||
|
this.cachekey = cachekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEpoch() {
|
||||||
|
return epoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CACHEKEY getCachekey() {
|
||||||
|
return cachekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
EpochEntry<?> that = (EpochEntry<?>) o;
|
||||||
|
return epoch == that.epoch && cachekey == that.cachekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(epoch, cachekey);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compareTo(Object o) {
|
||||||
|
if(this.epoch == ((EpochEntry<?>)o).epoch) {
|
||||||
|
return 0;
|
||||||
|
} else if (this.epoch < ((EpochEntry<?>)o).epoch) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
97
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
vendored
Normal file
97
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
vendored
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache implementation for the table, this cache is partial cache, this will
|
||||||
|
* be cleaned up, after entries are flushed to DB.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
public class PartialTableCache<CACHEKEY extends CacheKey,
|
||||||
|
CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> {
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<CACHEKEY, CACHEVALUE> cache;
|
||||||
|
private final TreeSet<EpochEntry<CACHEKEY>> epochEntries;
|
||||||
|
private ExecutorService executorService;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public PartialTableCache() {
|
||||||
|
cache = new ConcurrentHashMap<>();
|
||||||
|
epochEntries = new TreeSet<>();
|
||||||
|
// Created a singleThreadExecutor, so one cleanup will be running at a
|
||||||
|
// time.
|
||||||
|
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
|
||||||
|
.setNameFormat("PartialTableCache Cleanup Thread - %d").build();
|
||||||
|
executorService = Executors.newSingleThreadExecutor(build);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CACHEVALUE get(CACHEKEY cachekey) {
|
||||||
|
return cache.get(cachekey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(CACHEKEY cacheKey, CACHEVALUE value) {
|
||||||
|
cache.put(cacheKey, value);
|
||||||
|
epochEntries.add(new EpochEntry<>(value.getEpoch(), cacheKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup(long epoch) {
|
||||||
|
executorService.submit(() -> evictCache(epoch));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return cache.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void evictCache(long epoch) {
|
||||||
|
EpochEntry<CACHEKEY> currentEntry = null;
|
||||||
|
for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
|
||||||
|
iterator.hasNext();) {
|
||||||
|
currentEntry = iterator.next();
|
||||||
|
CACHEKEY cachekey = currentEntry.getCachekey();
|
||||||
|
CacheValue cacheValue = cache.get(cachekey);
|
||||||
|
if (cacheValue.getEpoch() <= epoch) {
|
||||||
|
cache.remove(cachekey);
|
||||||
|
iterator.remove();
|
||||||
|
} else {
|
||||||
|
// If currentEntry epoch is greater than epoch, we have deleted all
|
||||||
|
// entries less than specified epoch. So, we can break.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
63
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java
vendored
Normal file
63
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java
vendored
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache used for RocksDB tables.
|
||||||
|
* @param <CACHEKEY>
|
||||||
|
* @param <CACHEVALUE>
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
public interface TableCache<CACHEKEY extends CacheKey,
|
||||||
|
CACHEVALUE extends CacheValue> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the value for the key if it is present, otherwise return null.
|
||||||
|
* @param cacheKey
|
||||||
|
* @return CACHEVALUE
|
||||||
|
*/
|
||||||
|
CACHEVALUE get(CACHEKEY cacheKey);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an entry to the cache, if the key already exists it overrides.
|
||||||
|
* @param cacheKey
|
||||||
|
* @param value
|
||||||
|
*/
|
||||||
|
void put(CACHEKEY cacheKey, CACHEVALUE value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes all the entries from the cache which are having epoch value less
|
||||||
|
* than or equal to specified epoch value. For FullTable Cache this is a
|
||||||
|
* do-nothing operation.
|
||||||
|
* @param epoch
|
||||||
|
*/
|
||||||
|
void cleanup(long epoch);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the size of the cache.
|
||||||
|
* @return size
|
||||||
|
*/
|
||||||
|
int size();
|
||||||
|
}
|
18
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java
vendored
Normal file
18
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java
vendored
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
@ -26,10 +26,14 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.utils.db.Table.KeyValue;
|
import org.apache.hadoop.utils.db.Table.KeyValue;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheKey;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheValue;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -51,7 +55,7 @@ public class TestTypedRDBTableStore {
|
|||||||
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
|
Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
|
||||||
"First", "Second", "Third",
|
"First", "Second", "Third",
|
||||||
"Fourth", "Fifth",
|
"Fourth", "Fifth",
|
||||||
"Sixth");
|
"Sixth", "Seven");
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder folder = new TemporaryFolder();
|
public TemporaryFolder folder = new TemporaryFolder();
|
||||||
private RDBStore rdbStore = null;
|
private RDBStore rdbStore = null;
|
||||||
@ -236,4 +240,80 @@ public void forEachAndIterator() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTypedTableWithCache() throws Exception {
|
||||||
|
int iterCount = 10;
|
||||||
|
try (Table<String, String> testTable = createTypedTable(
|
||||||
|
"Seven")) {
|
||||||
|
|
||||||
|
for (int x = 0; x < iterCount; x++) {
|
||||||
|
String key = Integer.toString(x);
|
||||||
|
String value = Integer.toString(x);
|
||||||
|
testTable.addCacheEntry(new CacheKey<>(key),
|
||||||
|
new CacheValue<>(Optional.of(value),
|
||||||
|
x));
|
||||||
|
}
|
||||||
|
|
||||||
|
// As we have added to cache, so get should return value even if it
|
||||||
|
// does not exist in DB.
|
||||||
|
for (int x = 0; x < iterCount; x++) {
|
||||||
|
Assert.assertEquals(Integer.toString(1),
|
||||||
|
testTable.get(Integer.toString(1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTypedTableWithCacheWithFewDeletedOperationType()
|
||||||
|
throws Exception {
|
||||||
|
int iterCount = 10;
|
||||||
|
try (Table<String, String> testTable = createTypedTable(
|
||||||
|
"Seven")) {
|
||||||
|
|
||||||
|
for (int x = 0; x < iterCount; x++) {
|
||||||
|
String key = Integer.toString(x);
|
||||||
|
String value = Integer.toString(x);
|
||||||
|
if (x % 2 == 0) {
|
||||||
|
testTable.addCacheEntry(new CacheKey<>(key),
|
||||||
|
new CacheValue<>(Optional.of(value), x));
|
||||||
|
} else {
|
||||||
|
testTable.addCacheEntry(new CacheKey<>(key),
|
||||||
|
new CacheValue<>(Optional.absent(),
|
||||||
|
x));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// As we have added to cache, so get should return value even if it
|
||||||
|
// does not exist in DB.
|
||||||
|
for (int x = 0; x < iterCount; x++) {
|
||||||
|
if (x % 2 == 0) {
|
||||||
|
Assert.assertEquals(Integer.toString(x),
|
||||||
|
testTable.get(Integer.toString(x)));
|
||||||
|
} else {
|
||||||
|
Assert.assertNull(testTable.get(Integer.toString(x)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testTable.cleanupCache(5);
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
((TypedTable<String, String>) testTable).getCache().size() == 4,
|
||||||
|
100, 5000);
|
||||||
|
|
||||||
|
|
||||||
|
//Check remaining values
|
||||||
|
for (int x = 6; x < iterCount; x++) {
|
||||||
|
if (x % 2 == 0) {
|
||||||
|
Assert.assertEquals(Integer.toString(x),
|
||||||
|
testTable.get(Integer.toString(x)));
|
||||||
|
} else {
|
||||||
|
Assert.assertNull(testTable.get(Integer.toString(x)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
142
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
vendored
Normal file
142
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
vendored
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class tests partial table cache.
|
||||||
|
*/
|
||||||
|
public class TestPartialTableCache {
|
||||||
|
private TableCache<CacheKey<String>, CacheValue<String>> tableCache;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void create() {
|
||||||
|
tableCache = new PartialTableCache<>();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testPartialTableCache() {
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i< 10; i++) {
|
||||||
|
tableCache.put(new CacheKey<>(Integer.toString(i)),
|
||||||
|
new CacheValue<>(Optional.of(Integer.toString(i)), i));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for (int i=0; i < 10; i++) {
|
||||||
|
Assert.assertEquals(Integer.toString(i),
|
||||||
|
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// On a full table cache if some one calls cleanup it is a no-op.
|
||||||
|
tableCache.cleanup(4);
|
||||||
|
|
||||||
|
for (int i=5; i < 10; i++) {
|
||||||
|
Assert.assertEquals(Integer.toString(i),
|
||||||
|
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartialTableCacheParallel() throws Exception {
|
||||||
|
|
||||||
|
int totalCount = 0;
|
||||||
|
CompletableFuture<Integer> future =
|
||||||
|
CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return writeToCache(10, 1, 0);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
fail("writeToCache got interrupt exception");
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
});
|
||||||
|
int value = future.get();
|
||||||
|
Assert.assertEquals(10, value);
|
||||||
|
|
||||||
|
totalCount += value;
|
||||||
|
|
||||||
|
future =
|
||||||
|
CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return writeToCache(10, 11, 100);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
fail("writeToCache got interrupt exception");
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check we have first 10 entries in cache.
|
||||||
|
for (int i=1; i <= 10; i++) {
|
||||||
|
Assert.assertEquals(Integer.toString(i),
|
||||||
|
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
int deleted = 5;
|
||||||
|
// cleanup first 5 entires
|
||||||
|
tableCache.cleanup(deleted);
|
||||||
|
|
||||||
|
value = future.get();
|
||||||
|
Assert.assertEquals(10, value);
|
||||||
|
|
||||||
|
totalCount += value;
|
||||||
|
|
||||||
|
// We should totalCount - deleted entries in cache.
|
||||||
|
final int tc = totalCount;
|
||||||
|
GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
|
||||||
|
5000);
|
||||||
|
|
||||||
|
// Check if we have remaining entries.
|
||||||
|
for (int i=6; i <= totalCount; i++) {
|
||||||
|
Assert.assertEquals(Integer.toString(i),
|
||||||
|
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
tableCache.cleanup(10);
|
||||||
|
|
||||||
|
tableCache.cleanup(totalCount);
|
||||||
|
|
||||||
|
// Cleaned up all entries, so cache size should be zero.
|
||||||
|
GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
|
||||||
|
5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int writeToCache(int count, int startVal, long sleep)
|
||||||
|
throws InterruptedException {
|
||||||
|
int counter = 1;
|
||||||
|
while (counter <= count){
|
||||||
|
tableCache.put(new CacheKey<>(Integer.toString(startVal)),
|
||||||
|
new CacheValue<>(Optional.of(Integer.toString(startVal)), startVal));
|
||||||
|
startVal++;
|
||||||
|
counter++;
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
}
|
22
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java
vendored
Normal file
22
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java
vendored
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Tests for the DB Cache Utilities.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.utils.db.cache;
|
@ -59,6 +59,7 @@
|
|||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
|
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.StringUtil;
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -247,14 +248,13 @@ protected void initializeOmTables() throws IOException {
|
|||||||
userTable =
|
userTable =
|
||||||
this.store.getTable(USER_TABLE, String.class, VolumeList.class);
|
this.store.getTable(USER_TABLE, String.class, VolumeList.class);
|
||||||
checkTableStatus(userTable, USER_TABLE);
|
checkTableStatus(userTable, USER_TABLE);
|
||||||
this.store.getTable(VOLUME_TABLE, String.class,
|
|
||||||
String.class);
|
|
||||||
volumeTable =
|
volumeTable =
|
||||||
this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
|
this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
|
||||||
checkTableStatus(volumeTable, VOLUME_TABLE);
|
checkTableStatus(volumeTable, VOLUME_TABLE);
|
||||||
|
|
||||||
bucketTable =
|
bucketTable =
|
||||||
this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
|
this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
|
||||||
|
|
||||||
checkTableStatus(bucketTable, BUCKET_TABLE);
|
checkTableStatus(bucketTable, BUCKET_TABLE);
|
||||||
|
|
||||||
keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
|
keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
|
||||||
|
Loading…
Reference in New Issue
Block a user