diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index f26ed385c9..95ec4156a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -192,6 +192,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> org.fusesource.leveldbjni leveldbjni-all + + org.rocksdb + rocksdbjni + 5.5.5 + org.bouncycastle diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java index 415b7883a3..c7df429cc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java @@ -126,7 +126,9 @@ public void delete(byte[] key) { */ @Override public void close() throws IOException { - db.close(); + if (db != null){ + db.close(); + } } /** @@ -163,6 +165,7 @@ public DBIterator getIterator() { @Override public void destroy() throws IOException { + close(); JniDBFactory.factory.destroy(dbFile, dbOptions); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java index 81f2d8a5a7..55465490a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.iq80.leveldb.Options; +import org.rocksdb.BlockBasedTableConfig; import java.io.File; import java.io.IOException; @@ -82,8 +83,15 @@ public MetadataStore build() throws IOException { } store = new LevelDBStore(dbFile, options); } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) { - // TODO replace with rocksDB impl - store = new LevelDBStore(dbFile, new Options()); + org.rocksdb.Options opts = new org.rocksdb.Options(); + opts.setCreateIfMissing(createIfMissing); + + if (cacheSize > 0) { + BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); + tableConfig.setBlockCacheSize(cacheSize); + opts.setTableFormatConfig(tableConfig); + } + store = new RocksDBStore(dbFile, opts); } else { throw new IllegalArgumentException("Invalid argument for " + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java new file mode 100644 index 0000000000..b2e5e2a4b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.rocksdb.RocksIterator; +import org.rocksdb.Options; +import org.rocksdb.WriteOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.DbPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.AbstractMap; + +/** + * RocksDB implementation of ozone metadata store. + */ +public class RocksDBStore implements MetadataStore { + + private static final Logger LOG = + LoggerFactory.getLogger(RocksDBStore.class); + + private RocksDB db = null; + private File dbLocation; + private WriteOptions writeOptions; + private Options dbOptions; + + public RocksDBStore(File dbFile, Options options) throws IOException { + Preconditions.checkNotNull(dbFile, "DB file location cannot be null"); + RocksDB.loadLibrary(); + dbOptions = options; + dbLocation = dbFile; + writeOptions = new WriteOptions(); + writeOptions.setSync(true); + writeOptions.setNoSlowdown(true); + try { + db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath()); + } catch (RocksDBException e) { + throw new IOException("Failed init RocksDB, db path : " + + dbFile.getAbsolutePath(), e); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RocksDB successfully opened."); + LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath()); + LOG.debug("[Option] createIfMissing = {}", options.createIfMissing()); + LOG.debug("[Option] compactionPriority= {}", options.compactionStyle()); + LOG.debug("[Option] compressionType= {}", options.compressionType()); + LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles()); + LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize()); + } + } + + private IOException toIOException(String msg, RocksDBException e) { + String statusCode = e.getStatus() == null ? "N/A" : + e.getStatus().getCodeString(); + String errMessage = e.getMessage() == null ? "Unknown error" : + e.getMessage(); + String output = msg + "; status : " + statusCode + + "; message : " + errMessage; + return new IOException(output, e); + } + + @Override + public void put(byte[] key, byte[] value) throws IOException { + try { + db.put(writeOptions, key, value); + } catch (RocksDBException e) { + throw toIOException("Failed to put key-value to metadata store", e); + } + } + + @Override + public boolean isEmpty() throws IOException { + RocksIterator it = null; + try { + it = db.newIterator(); + it.seekToFirst(); + return !it.isValid(); + } finally { + if (it != null) { + it.close(); + } + } + } + + @Override + public byte[] get(byte[] key) throws IOException { + try { + return db.get(key); + } catch (RocksDBException e) { + throw toIOException("Failed to get the value for the given key", e); + } + } + + @Override + public void delete(byte[] key) throws IOException { + try { + db.delete(key); + } catch (RocksDBException e) { + throw toIOException("Failed to delete the given key", e); + } + } + + @Override + public List> getRangeKVs(byte[] startKey, + int count, MetadataKeyFilters.MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException { + List> result = new ArrayList<>(); + long start = System.currentTimeMillis(); + if (count < 0) { + throw new IllegalArgumentException( + "Invalid count given " + count + ", count must be greater than 0"); + } + RocksIterator it = null; + try { + it = db.newIterator(); + if (startKey == null) { + it.seekToFirst(); + } else { + if(get(startKey) == null) { + throw new IOException("Invalid start key, not found in current db"); + } + it.seek(startKey); + } + while(it.isValid() && result.size() < count) { + byte[] currentKey = it.key(); + byte[] currentValue = it.value(); + + it.prev(); + final byte[] prevKey = it.isValid() ? it.key() : null; + + it.seek(currentKey); + it.next(); + final byte[] nextKey = it.isValid() ? it.key() : null; + + if (filters == null || Arrays.asList(filters).stream() + .allMatch(entry -> entry.filterKey(prevKey, + currentKey, nextKey))) { + result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey, + currentValue)); + } + } + } finally { + if (it != null) { + it.close(); + } + long end = System.currentTimeMillis(); + long timeConsumed = end - start; + if (LOG.isDebugEnabled()) { + LOG.debug("Time consumed for getRangeKVs() is {}ms," + + " result length is {}.", timeConsumed, result.size()); + } + } + + return result; + } + + @Override + public void writeBatch(BatchOperation operation) + throws IOException { + List operations = + operation.getOperations(); + if (!operations.isEmpty()) { + try (WriteBatch writeBatch = new WriteBatch()) { + for (BatchOperation.SingleOperation opt : operations) { + switch (opt.getOpt()) { + case DELETE: + writeBatch.remove(opt.getKey()); + break; + case PUT: + writeBatch.put(opt.getKey(), opt.getValue()); + break; + default: + throw new IllegalArgumentException("Invalid operation " + + opt.getOpt()); + } + } + db.write(writeOptions, writeBatch); + } catch (RocksDBException e) { + throw toIOException("Batch write operation failed", e); + } + } + } + + @Override + public void compactDB() throws IOException { + if (db != null) { + try { + db.compactRange(); + } catch (RocksDBException e) { + throw toIOException("Failed to compact db", e); + } + } + } + + private void deleteQuietly(File fileOrDir) { + if (fileOrDir != null && fileOrDir.exists()) { + try { + FileUtils.forceDelete(fileOrDir); + } catch (IOException e) { + LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e); + } + } + } + + @Override + public void destroy() throws IOException { + // Make sure db is closed. + close(); + + // There is no destroydb java API available, + // equivalently we can delete all db directories. + deleteQuietly(dbLocation); + deleteQuietly(new File(dbOptions.dbLogDir())); + deleteQuietly(new File(dbOptions.walDir())); + List dbPaths = dbOptions.dbPaths(); + if (dbPaths != null) { + dbPaths.forEach(dbPath -> { + deleteQuietly(new File(dbPath.toString())); + }); + } + } + + @Override + public ImmutablePair peekAround(int offset, + byte[] from) throws IOException, IllegalArgumentException { + RocksIterator it = null; + try { + it = db.newIterator(); + if (from == null) { + it.seekToFirst(); + } else { + it.seek(from); + } + if (!it.isValid()) { + throw new IOException("Key not found"); + } + + switch (offset) { + case 0: + break; + case 1: + it.next(); + break; + case -1: + it.prev(); + break; + default: + throw new IllegalArgumentException( + "Position can only be -1, 0 " + "or 1, but found " + offset); + } + return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null; + } finally { + if (it != null) { + it.close(); + } + } + } + + @Override + public void iterate(byte[] from, EntryConsumer consumer) + throws IOException { + RocksIterator it = null; + try { + it = db.newIterator(); + if (from != null) { + it.seek(from); + } else { + it.seekToFirst(); + } + while (it.isValid()) { + if (!consumer.consume(it.key(), it.value())) { + break; + } + it.next(); + } + } finally { + if (it != null) { + it.close(); + } + } + } + + @Override + public void close() throws IOException { + if (db != null) { + db.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index bd29ce47fb..133bcb26eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -598,7 +598,7 @@ ozone.metastore.impl - LevelDB + RocksDB Ozone metadata store implementation. Ozone metadata are well distributed to multiple services such as ksm, scm. They are stored in some local diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java index 0000e50dba..143ea947f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; @@ -33,6 +34,8 @@ import org.junit.Test; import org.junit.Assert; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; @@ -40,15 +43,35 @@ import java.util.Map; import java.util.ArrayList; import java.util.UUID; +import java.util.Collection; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.runners.Parameterized.*; /** * Test class for ozone metadata store. */ +@RunWith(Parameterized.class) public class TestMetadataStore { + private final String storeImpl; + + public TestMetadataStore(String metadataImpl) { + this.storeImpl = metadataImpl; + } + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB}, + {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB} + }); + } + private MetadataStore store; private File testDir; - private final static int MAX_GETRANGE_LENGTH = 100; @Rule @@ -56,11 +79,11 @@ public class TestMetadataStore { @Before public void init() throws IOException { - testDir = GenericTestUtils.getTestDir(getClass().getSimpleName()); + testDir = GenericTestUtils.getTestDir(getClass().getSimpleName() + + "-" + storeImpl.toLowerCase()); Configuration conf = new OzoneConfiguration(); - conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, - OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl); store = MetadataStoreBuilder.newBuilder() .setConf(conf) @@ -293,4 +316,72 @@ public void testInvalidStartKey() throws IOException { expectedException.expectMessage("Invalid start key"); store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH); } + + @Test + public void testDestroyDB() throws IOException { + // create a new DB to test db destroy + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl); + + File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName() + + "-" + storeImpl.toLowerCase() + "-toDestroy"); + MetadataStore dbStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setCreateIfMissing(true) + .setDbFile(dbDir) + .build(); + + dbStore.put(getBytes("key1"), getBytes("value1")); + dbStore.put(getBytes("key2"), getBytes("value2")); + + Assert.assertFalse(dbStore.isEmpty()); + Assert.assertTrue(dbDir.exists()); + Assert.assertTrue(dbDir.listFiles().length > 0); + + dbStore.destroy(); + + Assert.assertFalse(dbDir.exists()); + } + + @Test + public void testBatchWrite() throws IOException { + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl); + + File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName() + + "-" + storeImpl.toLowerCase() + "-batchWrite"); + MetadataStore dbStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setCreateIfMissing(true) + .setDbFile(dbDir) + .build(); + + List expectedResult = Lists.newArrayList(); + for (int i = 0; i<10; i++) { + dbStore.put(getBytes("batch-" + i), getBytes("batch-value-" + i)); + expectedResult.add("batch-" + i); + } + + BatchOperation batch = new BatchOperation(); + batch.delete(getBytes("batch-2")); + batch.delete(getBytes("batch-3")); + batch.delete(getBytes("batch-4")); + batch.put(getBytes("batch-new-2"), getBytes("batch-new-value-2")); + + expectedResult.remove("batch-2"); + expectedResult.remove("batch-3"); + expectedResult.remove("batch-4"); + expectedResult.add("batch-new-2"); + + dbStore.writeBatch(batch); + + Iterator it = expectedResult.iterator(); + AtomicInteger count = new AtomicInteger(0); + dbStore.iterate(null, (key, value) -> { + count.incrementAndGet(); + return it.hasNext() && it.next().equals(getString(key)); + }); + + Assert.assertEquals(8, count.get()); + } }