diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/BatchOperation.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/BatchOperation.java new file mode 100644 index 0000000000..2c65736a70 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/BatchOperation.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.utils.db; + +/** + * Class represents a batch operation, collects multiple db operation. + */ +public interface BatchOperation extends AutoCloseable { + + void close(); +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index 6947a83c8e..26dbc43dea 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -19,12 +19,11 @@ package org.apache.hadoop.utils.db; -import org.apache.hadoop.classification.InterfaceStability; -import org.rocksdb.WriteBatch; - import java.io.IOException; import java.util.ArrayList; +import org.apache.hadoop.classification.InterfaceStability; + /** * The DBStore interface provides the ability to create Tables, which store * a specific type of Key-Value pair. Some DB interfaces like LevelDB will not @@ -107,9 +106,21 @@ void move(byte[] sourceKey, byte[] destKey, byte[] value, long getEstimatedKeyCount() throws IOException; /** - * Writes a transaction into the DB using the default write Options. - * @param batch - Batch to write. + * Initialize an atomic batch operation which can hold multiple PUT/DELETE + * operations and committed later in one step. + * + * @return BatchOperation holder which can be used to add or commit batch + * operations. */ - void write(WriteBatch batch) throws IOException; + BatchOperation initBatchOperation(); + + /** + * Commit the batch operations. + * + * @param operation which contains all the required batch operation. + * @throws IOException on Failure. + */ + void commitBatchOperation(BatchOperation operation) throws IOException; + } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java new file mode 100644 index 0000000000..a8b78ed8a9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.utils.db; + +import java.io.IOException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +/** + * Batch operation implementation for rocks db. + */ +public class RDBBatchOperation implements BatchOperation { + + private final WriteBatch writeBatch; + + public RDBBatchOperation() { + writeBatch = new WriteBatch(); + } + + public void commit(RocksDB db, WriteOptions writeOptions) throws IOException { + try { + db.write(writeOptions, writeBatch); + } catch (RocksDBException e) { + throw new IOException("Unable to write the batch.", e); + } + } + + @Override + public void close() { + writeBatch.close(); + } + + public void delete(ColumnFamilyHandle handle, byte[] key) throws IOException { + try { + writeBatch.delete(handle, key); + } catch (RocksDBException e) { + throw new IOException("Can't record batch delete operation.", e); + } + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) + throws IOException { + try { + writeBatch.put(handle, key, value); + } catch (RocksDBException e) { + throw new IOException("Can't record batch put operation.", e); + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 24cd96da6b..68c35fa604 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -237,14 +237,17 @@ public long getEstimatedKeyCount() throws IOException { } @Override - public void write(WriteBatch batch) throws IOException { - try { - db.write(writeOptions, batch); - } catch (RocksDBException e) { - throw toIOException("Unable to write the batch.", e); - } + public BatchOperation initBatchOperation() { + return new RDBBatchOperation(); } + @Override + public void commitBatchOperation(BatchOperation operation) + throws IOException { + ((RDBBatchOperation) operation).commit(db, writeOptions); + } + + @VisibleForTesting protected ObjectName getStatMBeanName() { return statMBeanName; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java index 8cf6b3533f..6b504c2b27 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java @@ -19,19 +19,19 @@ package org.apache.hadoop.utils.db; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + import org.apache.hadoop.hdfs.DFSUtil; + import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.charset.StandardCharsets; - /** * RocksDB implementation of ozone metadata store. */ @@ -79,7 +79,6 @@ public static IOException toIOException(String msg, RocksDBException e) { * * @return ColumnFamilyHandle. */ - @Override public ColumnFamilyHandle getHandle() { return handle; } @@ -96,6 +95,17 @@ public void put(byte[] key, byte[] value) throws IOException { } } + @Override + public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) + throws IOException { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).put(getHandle(), key, value); + } else { + throw new IllegalArgumentException("batch should be RDBBatchOperation"); + } + } + + @Override public boolean isEmpty() throws IOException { try (TableIterator keyIter = iterator()) { @@ -124,32 +134,15 @@ public void delete(byte[] key) throws IOException { } @Override - public void writeBatch(WriteBatch operation) throws IOException { - try { - db.write(writeOptions, operation); - } catch (RocksDBException e) { - throw toIOException("Batch write operation failed", e); + public void deleteWithBatch(BatchOperation batch, byte[] key) + throws IOException { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).delete(getHandle(), key); + } else { + throw new IllegalArgumentException("batch should be RDBBatchOperation"); } - } -// @Override -// public void iterate(byte[] from, EntryConsumer consumer) -// throws IOException { -// -// try (RocksIterator it = db.newIterator(handle)) { -// if (from != null) { -// it.seek(from); -// } else { -// it.seekToFirst(); -// } -// while (it.isValid()) { -// if (!consumer.consume(it.key(), it.value())) { -// break; -// } -// it.next(); -// } -// } -// } + } @Override public TableIterator iterator() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java index 3942585292..8bbd247e7b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java @@ -19,12 +19,10 @@ package org.apache.hadoop.utils.db; -import org.apache.hadoop.classification.InterfaceStability; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.WriteBatch; - import java.io.IOException; +import org.apache.hadoop.classification.InterfaceStability; + /** * Interface for key-value store that stores ozone metadata. Ozone metadata is * stored as key value pairs, both key and value are arbitrary byte arrays. Each @@ -42,6 +40,16 @@ public interface Table extends AutoCloseable { */ void put(byte[] key, byte[] value) throws IOException; + /** + * Puts a key-value pair into the store as part of a bath operation. + * + * @param batch the batch operation + * @param key metadata key + * @param value metadata value + */ + void putWithBatch(BatchOperation batch, byte[] key, byte[] value) + throws IOException; + /** * @return true if the metadata store is empty. * @throws IOException on Failure @@ -67,19 +75,13 @@ public interface Table extends AutoCloseable { void delete(byte[] key) throws IOException; /** - * Return the Column Family handle. TODO: This leaks an RockDB abstraction - * into Ozone code, cleanup later. + * Deletes a key from the metadata store as part of a batch operation. * - * @return ColumnFamilyHandle + * @param batch the batch operation + * @param key metadata key + * @throws IOException on Failure */ - ColumnFamilyHandle getHandle(); - - /** - * A batch of PUT, DELETE operations handled as a single atomic write. - * - * @throws IOException write fails - */ - void writeBatch(WriteBatch operation) throws IOException; + void deleteWithBatch(BatchOperation batch, byte[] key) throws IOException; /** * Returns the iterator for this metadata store. diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java index 9524f5f4c8..ed5fcb9c73 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java @@ -19,8 +19,16 @@ package org.apache.hadoop.utils.db; -import org.apache.commons.lang3.RandomStringUtils; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.hadoop.hdfs.DFSUtil; + +import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -32,14 +40,6 @@ import org.rocksdb.RocksDB; import org.rocksdb.Statistics; import org.rocksdb.StatsLevel; -import org.rocksdb.WriteBatch; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; /** * Tests for RocksDBTable Store. @@ -89,7 +89,7 @@ public void toIOException() { public void getHandle() throws Exception { try (Table testTable = rdbStore.getTable("First")) { Assert.assertNotNull(testTable); - Assert.assertNotNull(testTable.getHandle()); + Assert.assertNotNull(((RDBTable) testTable).getHandle()); } } @@ -149,18 +149,46 @@ public void delete() throws Exception { } @Test - public void writeBatch() throws Exception { - WriteBatch batch = new WriteBatch(); - try (Table testTable = rdbStore.getTable("Fifth")) { + public void batchPut() throws Exception { + try (Table testTable = rdbStore.getTable("Fifth"); + BatchOperation batch = rdbStore.initBatchOperation()) { + //given byte[] key = RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); byte[] value = RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); - batch.put(testTable.getHandle(), key, value); - testTable.writeBatch(batch); + Assert.assertNull(testTable.get(key)); + + //when + testTable.putWithBatch(batch, key, value); + rdbStore.commitBatchOperation(batch); + + //then Assert.assertNotNull(testTable.get(key)); } - batch.close(); + } + + @Test + public void batchDelete() throws Exception { + try (Table testTable = rdbStore.getTable("Fifth"); + BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + byte[] key = + RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); + byte[] value = + RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); + testTable.put(key, value); + Assert.assertNotNull(testTable.get(key)); + + + //when + testTable.deleteWithBatch(batch, key); + rdbStore.commitBatchOperation(batch); + + //then + Assert.assertNull(testTable.get(key)); + } } private static boolean consume(Table.KeyValue keyValue) { @@ -195,4 +223,4 @@ public void forEachAndIterator() throws Exception { } } } -} \ No newline at end of file +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java index 41a876bd78..b7af87c60f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java @@ -16,7 +16,11 @@ */ package org.apache.hadoop.ozone.om; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdfs.DFSUtil; @@ -28,19 +32,16 @@ import org.apache.hadoop.utils.BackgroundTaskQueue; import org.apache.hadoop.utils.BackgroundTaskResult; import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.utils.db.BatchOperation; +import org.apache.hadoop.utils.db.DBStore; import org.apache.hadoop.utils.db.Table; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is the background service to delete keys. Scan the metadata of om @@ -151,20 +152,23 @@ public BackgroundTaskResult call() throws Exception { private int deleteAllKeys(List results) throws RocksDBException, IOException { Table deletedTable = manager.getMetadataManager().getDeletedTable(); + + DBStore store = manager.getMetadataManager().getStore(); + // Put all keys to delete in a single transaction and call for delete. int deletedCount = 0; - try (WriteBatch writeBatch = new WriteBatch()) { + try (BatchOperation writeBatch = store.initBatchOperation()) { for (DeleteBlockGroupResult result : results) { if (result.isSuccess()) { // Purge key from OM DB. - writeBatch.delete(deletedTable.getHandle(), + deletedTable.deleteWithBatch(writeBatch, DFSUtil.string2Bytes(result.getObjectKey())); LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); deletedCount++; } } // Write a single transaction for delete. - manager.getMetadataManager().getStore().write(writeBatch); + store.commitBatchOperation(writeBatch); } return deletedCount; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 645b39fb61..2c0b5436ad 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -16,7 +16,12 @@ */ package org.apache.hadoop.ozone.om; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -34,22 +39,14 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.KeyLocationList; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BackgroundService; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; +import org.apache.hadoop.utils.db.BatchOperation; +import org.apache.hadoop.utils.db.DBStore; +import com.google.common.base.Preconditions; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; @@ -58,8 +55,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of keyManager. @@ -436,13 +435,14 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue)); newKeyInfo.setKeyName(toKeyName); newKeyInfo.updateModifcationTime(); - try (WriteBatch batch = new WriteBatch()) { - batch.delete(metadataManager.getKeyTable().getHandle(), fromKey); - batch.put(metadataManager.getKeyTable().getHandle(), toKey, + DBStore store = metadataManager.getStore(); + try (BatchOperation batch = store.initBatchOperation()) { + metadataManager.getKeyTable().deleteWithBatch(batch, fromKey); + metadataManager.getKeyTable().putWithBatch(batch, toKey, newKeyInfo.getProtobuf().toByteArray()); - metadataManager.getStore().write(batch); + store.commitBatchOperation(batch); } - } catch (RocksDBException | IOException ex) { + } catch (IOException ex) { LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}", volumeName, bucketName, fromKeyName, toKeyName, ex); throw new OMException(ex.getMessage(), diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java index b948acc2e5..36d245baac 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java @@ -16,34 +16,29 @@ */ package org.apache.hadoop.ozone.om; -import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.OzoneAclInfo; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.VolumeList; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.VolumeInfo; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.RocksDBStore; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_USER_MAX_VOLUME_DEFAULT; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_USER_MAX_VOLUME; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.RocksDBStore; +import org.apache.hadoop.utils.db.BatchOperation; + +import com.google.common.base.Preconditions; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * OM volume management code. @@ -69,7 +64,7 @@ public VolumeManagerImpl(OMMetadataManager metadataManager, // Helpers to add and delete volume from user list private void addVolumeToOwnerList(String volume, String owner, - WriteBatch batchOperation) throws RocksDBException, IOException { + BatchOperation batchOperation) throws IOException { // Get the volume list byte[] dbUserKey = metadataManager.getUserKey(owner); byte[] volumeList = metadataManager.getUserTable().get(dbUserKey); @@ -89,12 +84,12 @@ private void addVolumeToOwnerList(String volume, String owner, prevVolList.add(volume); VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - batchOperation.put(metadataManager.getUserTable().getHandle(), + metadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey, newVolList.toByteArray()); } private void delVolumeFromOwnerList(String volume, String owner, - WriteBatch batch) throws RocksDBException, IOException { + BatchOperation batch) throws RocksDBException, IOException { // Get the volume list byte[] dbUserKey = metadataManager.getUserKey(owner); byte[] volumeList = metadataManager.getUserTable().get(dbUserKey); @@ -110,11 +105,11 @@ private void delVolumeFromOwnerList(String volume, String owner, // Remove the volume from the list prevVolList.remove(volume); if (prevVolList.size() == 0) { - batch.delete(metadataManager.getUserTable().getHandle(), dbUserKey); + metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey); } else { VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - batch.put(metadataManager.getUserTable().getHandle(), + metadataManager.getUserTable().putWithBatch(batch, dbUserKey, newVolList.toByteArray()); } } @@ -138,7 +133,8 @@ public void createVolume(OmVolumeArgs args) throws IOException { throw new OMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS); } - try(WriteBatch batch = new WriteBatch()) { + try (BatchOperation batch = metadataManager.getStore() + .initBatchOperation()) { // Write the vol info List metadataList = new ArrayList<>(); for (Map.Entry entry : @@ -157,23 +153,19 @@ public void createVolume(OmVolumeArgs args) throws IOException { .addAllVolumeAcls(aclList) .setCreationTime(Time.now()) .build(); - batch.put(metadataManager.getVolumeTable().getHandle(), + metadataManager.getVolumeTable().putWithBatch(batch, dbVolumeKey, newVolumeInfo.toByteArray()); // Add volume to user list addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); - metadataManager.getStore().write(batch); + metadataManager.getStore().commitBatchOperation(batch); } LOG.debug("created volume:{} user:{}", args.getVolume(), args.getOwnerName()); - } catch (RocksDBException | IOException ex) { + } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Volume creation failed for user:{} volume:{}", args.getOwnerName(), args.getVolume(), ex); - } - if(ex instanceof RocksDBException) { - throw RocksDBStore.toIOException("Volume creation failed.", - (RocksDBException) ex); } else { throw (IOException) ex; } @@ -209,7 +201,8 @@ public void setOwner(String volume, String owner) throws IOException { OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo); Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - try(WriteBatch batch = new WriteBatch()) { + try (BatchOperation batch = metadataManager.getStore() + .initBatchOperation()) { delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); addVolumeToOwnerList(volume, owner, batch); @@ -222,9 +215,9 @@ public void setOwner(String volume, String owner) throws IOException { .build(); VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); - batch.put(metadataManager.getVolumeTable().getHandle(), + metadataManager.getVolumeTable().putWithBatch(batch, dbVolumeKey, newVolumeInfo.toByteArray()); - metadataManager.getStore().write(batch); + metadataManager.getStore().commitBatchOperation(batch); } } catch (RocksDBException | IOException ex) { if (!(ex instanceof OMException)) { @@ -356,11 +349,11 @@ public void deleteVolume(String volume) throws IOException { Preconditions.checkState(volume.equals(volumeInfo.getVolume())); // delete the volume from the owner list // as well as delete the volume entry - try(WriteBatch batch = new WriteBatch()) { + try (BatchOperation batch = metadataManager.getStore() + .initBatchOperation()) { delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch); - batch.delete(metadataManager.getVolumeTable().getHandle(), - dbVolumeKey); - metadataManager.getStore().write(batch); + metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey); + metadataManager.getStore().commitBatchOperation(batch); } } catch (RocksDBException| IOException ex) { if (!(ex instanceof OMException)) {