From ff036e49ff967d5dacf4b2d9d5376e57578ef391 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Sun, 2 Sep 2018 11:47:32 -0700 Subject: [PATCH] HDDS-357. Use DBStore and TableStore for OzoneManager non-background service. Contributed by Nandakumar. --- .../org/apache/hadoop/ozone/OzoneConsts.java | 6 +- .../org/apache/hadoop/utils/RocksDBStore.java | 2 +- .../org/apache/hadoop/utils/db/DBStore.java | 22 + .../org/apache/hadoop/utils/db/RDBStore.java | 26 +- .../src/main/resources/ozone-default.xml | 2 +- .../hadoop/hdds/server/ServerUtils.java | 5 + .../client/io/ChunkGroupOutputStream.java | 4 +- .../ozone/om/helpers/OpenKeySession.java | 6 +- .../om/protocol/OzoneManagerProtocol.java | 11 +- ...ManagerProtocolClientSideTranslatorPB.java | 8 +- .../src/main/proto/OzoneManagerProtocol.proto | 6 +- .../TestCloseContainerHandlingByClient.java | 37 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 4 + .../apache/hadoop/ozone/om/TestOmSQLCli.java | 7 +- .../hadoop/ozone/om/TestOzoneManager.java | 37 +- .../hadoop/ozone/web/client/TestVolume.java | 6 + .../hadoop/ozone/om/BucketManagerImpl.java | 57 +- .../apache/hadoop/ozone/om/KeyManager.java | 6 +- .../hadoop/ozone/om/KeyManagerImpl.java | 282 ++++------ .../hadoop/ozone/om/OMMetadataManager.java | 214 ++++---- .../ozone/om/OmMetadataManagerImpl.java | 509 +++++++++++------- .../apache/hadoop/ozone/om/OzoneManager.java | 209 ++++--- .../hadoop/ozone/om/VolumeManagerImpl.java | 152 +++--- ...ManagerProtocolServerSideTranslatorPB.java | 7 +- .../ozone/om/TestBucketManagerImpl.java | 208 +++---- .../apache/hadoop/ozone/scm/cli/SQLCLI.java | 12 +- 26 files changed, 975 insertions(+), 870 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 15366fb836..8ea4d7f390 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -92,7 +92,6 @@ public final class OzoneConsts { public static final String CONTAINER_DB_SUFFIX = "container.db"; public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX; public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX; - public static final String BLOCK_DB = "block.db"; public static final String OPEN_CONTAINERS_DB = "openContainers.db"; public static final String DELETED_BLOCK_DB = "deletedBlock.db"; public static final String OM_DB_NAME = "om.db"; @@ -113,8 +112,6 @@ public static Versioning getVersioning(boolean versioning) { public static final String DELETING_KEY_PREFIX = "#deleting#"; public static final String DELETED_KEY_PREFIX = "#deleted#"; public static final String DELETE_TRANSACTION_KEY_PREFIX = "#delTX#"; - public static final String OPEN_KEY_PREFIX = "#open#"; - public static final String OPEN_KEY_ID_DELIMINATOR = "#"; /** * OM LevelDB prefixes. @@ -138,8 +135,7 @@ public static Versioning getVersioning(boolean versioning) { * | #deleting#/volumeName/bucketName/keyName | KeyInfo | * ---------------------------------------------------------- */ - public static final String OM_VOLUME_PREFIX = "/#"; - public static final String OM_BUCKET_PREFIX = "/#"; + public static final String OM_KEY_PREFIX = "/"; public static final String OM_USER_PREFIX = "$"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java index b243e3d7e3..379d9e9d1d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java @@ -94,7 +94,7 @@ public RocksDBStore(File dbFile, Options options) } } - private IOException toIOException(String msg, RocksDBException e) { + public static IOException toIOException(String msg, RocksDBException e) { String statusCode = e.getStatus() == null ? "N/A" : e.getStatus().getCodeString(); String errMessage = e.getMessage() == null ? "Unknown error" : diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index a817f4f0c8..6947a83c8e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.utils.db; import org.apache.hadoop.classification.InterfaceStability; +import org.rocksdb.WriteBatch; import java.io.IOException; import java.util.ArrayList; @@ -82,6 +83,22 @@ public interface DBStore extends AutoCloseable { void move(byte[] key, byte[] value, Table source, Table dest) throws IOException; + /** + * Moves a key from the Source Table to the destination Table and updates the + * destination with the new key name and value. + * This is similar to deleting an entry in one table and adding an entry in + * another table, here it is done atomically. + * + * @param sourceKey - Key to move. + * @param destKey - Destination key name. + * @param value - new value to write to the destination table. + * @param source - Source Table. + * @param dest - Destination Table. + * @throws IOException on Failure + */ + void move(byte[] sourceKey, byte[] destKey, byte[] value, + Table source, Table dest) throws IOException; + /** * Returns an estimated count of keys in this DB. * @@ -89,5 +106,10 @@ void move(byte[] key, byte[] value, Table source, Table dest) */ long getEstimatedKeyCount() throws IOException; + /** + * Writes a transaction into the DB using the default write Options. + * @param batch - Batch to write. + */ + void write(WriteBatch batch) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 85508d5c90..5078b3e96c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -189,9 +189,16 @@ public void move(byte[] key, Table source, Table dest) throws IOException { } } + @Override public void move(byte[] key, byte[] value, Table source, Table dest) throws IOException { + move(key, key, value, source, dest); + } + + @Override + public void move(byte[] sourceKey, byte[] destKey, byte[] value, Table source, + Table dest) throws IOException { RDBTable sourceTable; RDBTable destTable; if (source instanceof RDBTable) { @@ -210,13 +217,13 @@ public void move(byte[] key, byte[] value, Table source, + "RocksDBTable."); } try (WriteBatch batch = new WriteBatch()) { - batch.put(destTable.getHandle(), key, value); - batch.delete(sourceTable.getHandle(), key); + batch.put(destTable.getHandle(), destKey, value); + batch.delete(sourceTable.getHandle(), sourceKey); db.write(writeOptions, batch); } catch (RocksDBException rockdbException) { - LOG.error("Move of key failed. Key:{}", DFSUtil.bytes2String(key)); - throw toIOException("Unable to move key: " + DFSUtil.bytes2String(key), - rockdbException); + LOG.error("Move of key failed. Key:{}", DFSUtil.bytes2String(sourceKey)); + throw toIOException("Unable to move key: " + + DFSUtil.bytes2String(sourceKey), rockdbException); } } @@ -229,6 +236,15 @@ public long getEstimatedKeyCount() throws IOException { } } + @Override + public void write(WriteBatch batch) throws IOException { + try { + db.write(writeOptions, batch); + } catch (RocksDBException e) { + throw toIOException("Unable to write the batch.", e); + } + } + @VisibleForTesting protected ObjectName getStatMBeanName() { return statMBeanName; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index d3ec4a58e1..6f296c61d0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1101,7 +1101,7 @@ hdds.db.profile - DBProfile.SSD + SSD OZONE, OM, PERFORMANCE, REQUIRED This property allows user to pick a configuration that tunes the RocksDB settings for the hardware it is running diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java index a0e78dcc7f..c6d85d8b4f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java @@ -136,4 +136,9 @@ public static File getOzoneMetaDirPath(Configuration conf) { return dirPath; } + public static void setOzoneMetaDirPath(OzoneConfiguration conf, + String path) { + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path); + } + } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 00624d577b..c632df6a6d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -74,7 +74,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final StorageContainerLocationProtocolClientSideTranslatorPB scmClient; private final OmKeyArgs keyArgs; - private final int openID; + private final long openID; private final XceiverClientManager xceiverClientManager; private final int chunkSize; private final String requestID; @@ -115,7 +115,7 @@ public List getStreamEntries() { } @VisibleForTesting - public int getOpenID() { + public long getOpenID() { return openID; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java index bc364e665f..11ee622494 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java @@ -23,14 +23,14 @@ * that servers can recognize this client, and thus know how to close the key. */ public class OpenKeySession { - private final int id; + private final long id; private final OmKeyInfo keyInfo; // the version of the key when it is being opened in this session. // a block that has a create version equals to open version means it will // be committed only when this open session is closed. private long openVersion; - public OpenKeySession(int id, OmKeyInfo info, long version) { + public OpenKeySession(long id, OmKeyInfo info, long version) { this.id = id; this.keyInfo = info; this.openVersion = version; @@ -44,7 +44,7 @@ public OmKeyInfo getKeyInfo() { return keyInfo; } - public int getId() { + public long getId() { return id; } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index b7a099d028..edb260a108 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -148,7 +148,7 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName) * @param clientID the client identification * @throws IOException */ - void commitKey(OmKeyArgs args, int clientID) throws IOException; + void commitKey(OmKeyArgs args, long clientID) throws IOException; /** * Allocate a new block, it is assumed that the client is having an open key @@ -159,7 +159,7 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName) * @return an allocated block * @throws IOException */ - OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) throws IOException; /** @@ -172,9 +172,10 @@ OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) OmKeyInfo lookupKey(OmKeyArgs args) throws IOException; /** - * Rename an existing key within a bucket + * Rename an existing key within a bucket. * @param args the args of the key. * @param toKeyName New name to be used for the Key + * @throws IOException */ void renameKey(OmKeyArgs args, String toKeyName) throws IOException; @@ -214,7 +215,7 @@ OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) * @throws IOException */ List listBuckets(String volumeName, - String startBucketName, String bucketPrefix, int maxNumOfBuckets) + String startBucketName, String bucketPrefix, int maxNumOfBuckets) throws IOException; /** @@ -239,7 +240,7 @@ List listBuckets(String volumeName, * @throws IOException */ List listKeys(String volumeName, - String bucketName, String startKeyName, String keyPrefix, int maxKeys) + String bucketName, String startKeyName, String keyPrefix, int maxKeys) throws IOException; /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index e557ac5173..c0829fabb9 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -488,7 +488,7 @@ public void setBucketProperty(OmBucketArgs args) */ @Override public List listBuckets(String volumeName, - String startKey, String prefix, int count) throws IOException { + String startKey, String prefix, int count) throws IOException { List buckets = new ArrayList<>(); ListBucketsRequest.Builder reqBuilder = ListBucketsRequest.newBuilder(); reqBuilder.setVolumeName(volumeName); @@ -554,7 +554,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) throws IOException { AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder(); KeyArgs keyArgs = KeyArgs.newBuilder() @@ -579,7 +579,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) } @Override - public void commitKey(OmKeyArgs args, int clientID) + public void commitKey(OmKeyArgs args, long clientID) throws IOException { CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder(); List locationInfoList = args.getLocationInfoList(); @@ -708,7 +708,7 @@ public void deleteBucket(String volume, String bucket) throws IOException { */ @Override public List listKeys(String volumeName, String bucketName, - String startKey, String prefix, int maxKeys) throws IOException { + String startKey, String prefix, int maxKeys) throws IOException { List keys = new ArrayList<>(); ListKeysRequest.Builder reqBuilder = ListKeysRequest.newBuilder(); reqBuilder.setVolumeName(volumeName); diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 51a0a7fe4d..242e3b5b83 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -273,7 +273,7 @@ message LocateKeyResponse { optional KeyInfo keyInfo = 2; // clients' followup request may carry this ID for stateful operations (similar // to a cookie). - optional uint32 ID = 3; + optional uint64 ID = 3; // TODO : allow specifiying a particular version to read. optional uint64 openVersion = 4; } @@ -319,7 +319,7 @@ message ListKeysResponse { message AllocateBlockRequest { required KeyArgs keyArgs = 1; - required uint32 clientID = 2; + required uint64 clientID = 2; } message AllocateBlockResponse { @@ -329,7 +329,7 @@ message AllocateBlockResponse { message CommitKeyRequest { required KeyArgs keyArgs = 1; - required uint32 clientID = 2; + required uint64 clientID = 2; } message CommitKeyResponse { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index ffdba7e187..50d7ec54dd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -1,19 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.hadoop.ozone.client.rpc; @@ -69,7 +68,6 @@ public class TestCloseContainerHandlingByClient { private static String bucketName; private static String keyString; - /** * Create a MiniDFSCluster for testing. *

@@ -80,7 +78,7 @@ public class TestCloseContainerHandlingByClient { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - chunkSize = (int)OzoneConsts.MB; + chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4)); @@ -108,7 +106,7 @@ public static void shutdown() { } private static String fixedLengthString(String string, int length) { - return String.format("%1$"+length+ "s", string); + return String.format("%1$" + length + "s", string); } @Test @@ -288,13 +286,13 @@ private void waitForContainerClose(String keyName, ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) outputStream.getOutputStream(); - int clientId = groupOutputStream.getOpenID(); + long clientId = groupOutputStream.getOpenID(); OMMetadataManager metadataManager = cluster.getOzoneManager().getMetadataManager(); - String objectKey = - metadataManager.getKeyWithDBPrefix(volumeName, bucketName, keyName); - byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientId); - byte[] openKeyData = metadataManager.get(openKey); + byte[] openKey = + metadataManager.getOpenKeyBytes( + volumeName, bucketName, keyName, clientId); + byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey); OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf( OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData)); List locationInfoList = @@ -361,7 +359,6 @@ private void validateData(String keyName, byte[] data) throws Exception { is.close(); } - @Test public void testBlockWriteViaRatis() throws Exception { String keyName = "ratis"; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 45b3843577..f8ad32e82d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -51,6 +51,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -601,6 +602,9 @@ public void testRenameKey() Assert.assertEquals(toKeyName, key.getName()); } + // Listing all volumes in the cluster feature has to be fixed after HDDS-357. + // TODO: fix this + @Ignore @Test public void testListVolume() throws IOException, OzoneException { String volBase = "vol-" + RandomStringUtils.randomNumeric(3); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java index ab26c00141..a3ff6c8068 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java @@ -30,6 +30,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -82,7 +83,8 @@ public class TestOmSQLCli { @Parameterized.Parameters public static Collection data() { return Arrays.asList(new Object[][] { - {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB}, + // Uncomment the below line if we support leveldb in future. + //{OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB}, {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB} }); } @@ -161,6 +163,9 @@ public void shutdown() { } } + // After HDDS-357, we have to fix SQLCli. + // TODO: fix SQLCli + @Ignore @Test public void testOmDB() throws Exception { String dbOutPath = GenericTestUtils.getTempPath( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java index 4908c4daf6..b6ade60190 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java @@ -56,8 +56,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.db.Table; +import org.apache.hadoop.utils.db.TableIterator; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -75,7 +75,6 @@ import java.net.InetSocketAddress; import java.text.ParseException; import java.util.LinkedList; -import java.util.Map; import java.util.Random; import java.util.Set; import java.util.List; @@ -83,8 +82,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; -import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_CLIENT_ADDRESS_KEY; @@ -631,13 +630,16 @@ public void testDeleteKey() throws IOException, OzoneException { storageHandler.deleteKey(keyArgs); Assert.assertEquals(1 + numKeyDeletes, omMetrics.getNumKeyDeletes()); - // Make sure the deleted key has been renamed. - MetadataStore store = cluster.getOzoneManager(). - getMetadataManager().getStore(); - List> list = store.getRangeKVs(null, 10, - new MetadataKeyFilters.KeyPrefixFilter() - .addFilter(DELETING_KEY_PREFIX)); - Assert.assertEquals(1, list.size()); + // Make sure the deleted key has been moved to the deleted table. + OMMetadataManager manager = cluster.getOzoneManager(). + getMetadataManager(); + + try(TableIterator iter = + manager.getDeletedTable().iterator()) { + iter.seekToFirst(); + Table.KeyValue kv = iter.next(); + Assert.assertNotNull(kv); + } // Delete the key again to test deleting non-existing key. try { @@ -1016,13 +1018,14 @@ public void testListVolumes() throws IOException, OzoneException { storageHandler.createVolume(createVolumeArgs); } - // Test list all volumes + // Test list all volumes - Removed Support for this operation for time + // being. TODO: we will need to bring this back if needed. UserArgs userArgs0 = new UserArgs(user0, OzoneUtils.getRequestID(), null, null, null, null); - listVolumeArgs = new ListArgs(userArgs0, "Vol-testListVolumes", 100, null); - listVolumeArgs.setRootScan(true); - volumes = storageHandler.listVolumes(listVolumeArgs); - Assert.assertEquals(20, volumes.getVolumes().size()); + //listVolumeArgs = new ListArgs(userArgs0,"Vol-testListVolumes", 100, null); + // listVolumeArgs.setRootScan(true); + // volumes = storageHandler.listVolumes(listVolumeArgs); + // Assert.assertEquals(20, volumes.getVolumes().size()); // Test list all volumes belongs to an user listVolumeArgs = new ListArgs(userArgs0, null, 100, null); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestVolume.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestVolume.java index 31f9214ba6..3765bc81b1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestVolume.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestVolume.java @@ -221,6 +221,9 @@ static void runTestChangeQuotaOnVolume(ClientProtocol client) assertTrue(newVol.getCreationTime() > 0); } + // Listing all volumes in the cluster feature has to be fixed after HDDS-357. + // TODO: fix this + @Ignore @Test public void testListVolume() throws OzoneException, IOException { runTestListVolume(client); @@ -305,6 +308,9 @@ static void runTestListAllVolumes(ClientProtocol client) assertEquals(volCount / step, pagecount); } + // Listing all volumes in the cluster feature has to be fixed after HDDS-357. + // TODO: fix this + @Ignore @Test public void testListVolumes() throws Exception { runTestListVolumes(client); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java index 4bbce8101b..d54addddfb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java @@ -18,12 +18,11 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.BucketInfo; -import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo; import org.apache.hadoop.util.Time; import org.iq80.leveldb.DBException; import org.slf4j.Logger; @@ -46,9 +45,10 @@ public class BucketManagerImpl implements BucketManager { /** * Constructs BucketManager. + * * @param metadataManager */ - public BucketManagerImpl(OMMetadataManager metadataManager){ + public BucketManagerImpl(OMMetadataManager metadataManager) { this.metadataManager = metadataManager; } @@ -73,6 +73,7 @@ public BucketManagerImpl(OMMetadataManager metadataManager){ /** * Creates a bucket. + * * @param bucketInfo - OmBucketInfo. */ @Override @@ -86,13 +87,13 @@ public void createBucket(OmBucketInfo bucketInfo) throws IOException { byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); //Check if the volume exists - if (metadataManager.get(volumeKey) == null) { + if (metadataManager.getVolumeTable().get(volumeKey) == null) { LOG.debug("volume: {} not found ", volumeName); throw new OMException("Volume doesn't exist", OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); } //Check if bucket already exists - if (metadataManager.get(bucketKey) != null) { + if (metadataManager.getBucketTable().get(bucketKey) != null) { LOG.debug("bucket: {} already exists ", bucketName); throw new OMException("Bucket already exist", OMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS); @@ -106,7 +107,8 @@ public void createBucket(OmBucketInfo bucketInfo) throws IOException { .setIsVersionEnabled(bucketInfo.getIsVersionEnabled()) .setCreationTime(Time.now()) .build(); - metadataManager.put(bucketKey, omBucketInfo.getProtobuf().toByteArray()); + metadataManager.getBucketTable().put(bucketKey, + omBucketInfo.getProtobuf().toByteArray()); LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName); } catch (IOException | DBException ex) { @@ -134,7 +136,7 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName) metadataManager.readLock().lock(); try { byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); - byte[] value = metadataManager.get(bucketKey); + byte[] value = metadataManager.getBucketTable().get(bucketKey); if (value == null) { LOG.debug("bucket: {} not found in volume: {}.", bucketName, volumeName); @@ -155,8 +157,9 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName) /** * Sets bucket property from args. + * * @param args - BucketArgs. - * @throws IOException + * @throws IOException - On Failure. */ @Override public void setBucketProperty(OmBucketArgs args) throws IOException { @@ -167,15 +170,15 @@ public void setBucketProperty(OmBucketArgs args) throws IOException { try { byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); //Check if volume exists - if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) == - null) { + if (metadataManager.getVolumeTable() + .get(metadataManager.getVolumeKey(volumeName)) == null) { LOG.debug("volume: {} not found ", volumeName); throw new OMException("Volume doesn't exist", OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); } - byte[] value = metadataManager.get(bucketKey); + byte[] value = metadataManager.getBucketTable().get(bucketKey); //Check if bucket exist - if(value == null) { + if (value == null) { LOG.debug("bucket: {} not found ", bucketName); throw new OMException("Bucket doesn't exist", OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); @@ -187,7 +190,7 @@ public void setBucketProperty(OmBucketArgs args) throws IOException { .setBucketName(oldBucketInfo.getBucketName()); //Check ACLs to update - if(args.getAddAcls() != null || args.getRemoveAcls() != null) { + if (args.getAddAcls() != null || args.getRemoveAcls() != null) { bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(), args.getRemoveAcls(), args.getAddAcls())); LOG.debug("Updating ACLs for bucket: {} in volume: {}", @@ -218,7 +221,7 @@ public void setBucketProperty(OmBucketArgs args) throws IOException { } bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime()); - metadataManager.put(bucketKey, + metadataManager.getBucketTable().put(bucketKey, bucketInfoBuilder.build().getProtobuf().toByteArray()); } catch (IOException | DBException ex) { if (!(ex instanceof OMException)) { @@ -242,10 +245,10 @@ public void setBucketProperty(OmBucketArgs args) throws IOException { */ private List getUpdatedAclList(List existingAcls, List removeAcls, List addAcls) { - if(removeAcls != null && !removeAcls.isEmpty()) { + if (removeAcls != null && !removeAcls.isEmpty()) { existingAcls.removeAll(removeAcls); } - if(addAcls != null && !addAcls.isEmpty()) { + if (addAcls != null && !addAcls.isEmpty()) { addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach( existingAcls::add); } @@ -254,9 +257,10 @@ private List getUpdatedAclList(List existingAcls, /** * Deletes an existing empty bucket from volume. + * * @param volumeName - Name of the volume. * @param bucketName - Name of the bucket. - * @throws IOException + * @throws IOException - on Failure. */ public void deleteBucket(String volumeName, String bucketName) throws IOException { @@ -264,16 +268,17 @@ public void deleteBucket(String volumeName, String bucketName) Preconditions.checkNotNull(bucketName); metadataManager.writeLock().lock(); try { - byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); //Check if volume exists - if (metadataManager.get(metadataManager.getVolumeKey(volumeName)) - == null) { + if (metadataManager.getVolumeTable() + .get(metadataManager.getVolumeKey(volumeName)) == null) { LOG.debug("volume: {} not found ", volumeName); throw new OMException("Volume doesn't exist", OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); } - //Check if bucket exist - if (metadataManager.get(bucketKey) == null) { + + //Check if bucket exists + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + if (metadataManager.getBucketTable().get(bucketKey) == null) { LOG.debug("bucket: {} not found ", bucketName); throw new OMException("Bucket doesn't exist", OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); @@ -284,7 +289,7 @@ public void deleteBucket(String volumeName, String bucketName) throw new OMException("Bucket is not empty", OMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY); } - metadataManager.delete(bucketKey); + metadataManager.getBucketTable().delete(bucketKey); } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName, @@ -301,7 +306,7 @@ public void deleteBucket(String volumeName, String bucketName) */ @Override public List listBuckets(String volumeName, - String startBucket, String bucketPrefix, int maxNumOfBuckets) + String startBucket, String bucketPrefix, int maxNumOfBuckets) throws IOException { Preconditions.checkNotNull(volumeName); metadataManager.readLock().lock(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 226c07d6fe..a512d7be00 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -49,7 +49,7 @@ public interface KeyManager { * @param clientID the client that is committing. * @throws IOException */ - void commitKey(OmKeyArgs args, int clientID) throws IOException; + void commitKey(OmKeyArgs args, long clientID) throws IOException; /** * A client calls this on an open key, to request to allocate a new block, @@ -60,7 +60,7 @@ public interface KeyManager { * @return the reference to the new block. * @throws IOException */ - OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) throws IOException; /** * Given the args of a key to put, write an open key entry to meta data. @@ -128,7 +128,7 @@ OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) * @throws IOException */ List listKeys(String volumeName, - String bucketName, String startKey, String keyPrefix, int maxKeys) + String bucketName, String startKey, String keyPrefix, int maxKeys) throws IOException; /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index d0561d6ab9..d5855235a6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -17,24 +17,25 @@ package org.apache.hadoop.ozone.om; import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.om.helpers.OpenKeySession; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.iq80.leveldb.DBException; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,25 +43,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Random; - -import static org.apache.hadoop.ozone - .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB; -import org.apache.hadoop.hdds.protocol - .proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol - .proto.HddsProtos.ReplicationFactor; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB; /** * Implementation of keyManager. @@ -78,13 +67,12 @@ public class KeyManagerImpl implements KeyManager { private final boolean useRatis; private final long preallocateMax; - private final Random random; private final String omId; public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, - OMMetadataManager metadataManager, - OzoneConfiguration conf, - String omId) { + OMMetadataManager metadataManager, + OzoneConfiguration conf, + String omId) { this.scmBlockClient = scmBlockClient; this.metadataManager = metadataManager; this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_IN_MB, @@ -94,11 +82,9 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, this.preallocateMax = conf.getLong( OZONE_KEY_PREALLOCATION_MAXSIZE, OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); - random = new Random(); this.omId = omId; } - @Override public void start() { } @@ -113,13 +99,13 @@ private void validateBucket(String volumeName, String bucketName) byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); //Check if the volume exists - if(metadataManager.get(volumeKey) == null) { + if (metadataManager.getVolumeTable().get(volumeKey) == null) { LOG.error("volume not found: {}", volumeName); throw new OMException("Volume not found", OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); } //Check if bucket already exists - if(metadataManager.get(bucketKey) == null) { + if (metadataManager.getBucketTable().get(bucketKey) == null) { LOG.error("bucket not found: {}/{} ", volumeName, bucketName); throw new OMException("Bucket not found", OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); @@ -127,7 +113,7 @@ private void validateBucket(String volumeName, String bucketName) } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) throws IOException { Preconditions.checkNotNull(args); metadataManager.writeLock().lock(); @@ -137,13 +123,13 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) try { validateBucket(volumeName, bucketName); - String objectKey = metadataManager.getKeyWithDBPrefix( - volumeName, bucketName, keyName); - byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID); - byte[] keyData = metadataManager.get(openKey); + byte[] openKey = metadataManager.getOpenKeyBytes( + volumeName, bucketName, keyName, clientID); + + byte[] keyData = metadataManager.getOpenKeyTable().get(openKey); if (keyData == null) { - LOG.error("Allocate block for a key not in open status in meta store " + - objectKey + " with ID " + clientID); + LOG.error("Allocate block for a key not in open status in meta store" + + " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID); throw new OMException("Open Key not found", OMException.ResultCodes.FAILED_KEY_NOT_FOUND); } @@ -162,7 +148,8 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) // the same version keyInfo.appendNewBlocks(Collections.singletonList(info)); keyInfo.updateModifcationTime(); - metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); + metadataManager.getOpenKeyTable().put(openKey, + keyInfo.getProtobuf().toByteArray()); return info; } finally { metadataManager.writeLock().unlock(); @@ -172,28 +159,30 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) @Override public OpenKeySession openKey(OmKeyArgs args) throws IOException { Preconditions.checkNotNull(args); - metadataManager.writeLock().lock(); String volumeName = args.getVolumeName(); String bucketName = args.getBucketName(); + validateBucket(volumeName, bucketName); + + metadataManager.writeLock().lock(); String keyName = args.getKeyName(); ReplicationFactor factor = args.getFactor(); ReplicationType type = args.getType(); + long currentTime = Time.monotonicNowNanos(); // If user does not specify a replication strategy or // replication factor, OM will use defaults. - if(factor == null) { - factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE; + if (factor == null) { + factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE; } - if(type == null) { + if (type == null) { type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; } try { - validateBucket(volumeName, bucketName); long requestedSize = Math.min(preallocateMax, args.getDataSize()); List locations = new ArrayList<>(); - String objectKey = metadataManager.getKeyWithDBPrefix( + byte[] objectKey = metadataManager.getOzoneKeyBytes( volumeName, bucketName, keyName); // requested size is not required but more like a optimization: // SCM looks at the requested, if it 0, no block will be allocated at @@ -218,9 +207,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { // value, then this value is used, otherwise, we allocate a single block // which is the current size, if read by the client. long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize; - byte[] keyKey = metadataManager.getDBKeyBytes( - volumeName, bucketName, keyName); - byte[] value = metadataManager.get(keyKey); + byte[] value = metadataManager.getKeyTable().get(objectKey); OmKeyInfo keyInfo; long openVersion; if (value != null) { @@ -233,7 +220,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } else { // the key does not exist, create a new object, the new blocks are the // version 0 - long currentTime = Time.now(); + keyInfo = new OmKeyInfo.Builder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) @@ -248,31 +235,31 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { .build(); openVersion = 0; } - // Generate a random ID which is not already in meta db. - int id = -1; - // in general this should finish in a couple times at most. putting some - // arbitrary large number here to avoid dead loop. - for (int j = 0; j < 10000; j++) { - id = random.nextInt(); - byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id); - if (metadataManager.get(openKey) == null) { - metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); - break; - } - } - if (id == -1) { - throw new IOException("Failed to find a usable id for " + objectKey); + byte[] openKey = metadataManager.getOpenKeyBytes( + volumeName, bucketName, keyName, currentTime); + if (metadataManager.getOpenKeyTable().get(openKey) != null) { + // This should not happen. If this condition is satisfied, it means + // that we have generated a same openKeyId (i.e. currentTime) for two + // different client who are trying to write the same key at the same + // time. The chance of this happening is very, very minimal. + + // Do we really need this check? Can we avoid this to gain some + // minor performance improvement? + LOG.warn("Cannot allocate key. The generated open key id is already" + + "used for the same key which is currently being written."); + throw new OMException("Cannot allocate key. Not able to get a valid" + + "open key id.", OMException.ResultCodes.FAILED_KEY_ALLOCATION); } + metadataManager.getOpenKeyTable().put(openKey, + keyInfo.getProtobuf().toByteArray()); LOG.debug("Key {} allocated in volume {} bucket {}", keyName, volumeName, bucketName); - return new OpenKeySession(id, keyInfo, openVersion); + return new OpenKeySession(currentTime, keyInfo, openVersion); } catch (OMException e) { throw e; } catch (IOException ex) { - if (!(ex instanceof OMException)) { - LOG.error("Key open failed for volume:{} bucket:{} key:{}", - volumeName, bucketName, keyName, ex); - } + LOG.error("Key open failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); throw new OMException(ex.getMessage(), OMException.ResultCodes.FAILED_KEY_ALLOCATION); } finally { @@ -281,7 +268,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } @Override - public void commitKey(OmKeyArgs args, int clientID) throws IOException { + public void commitKey(OmKeyArgs args, long clientID) throws IOException { Preconditions.checkNotNull(args); metadataManager.writeLock().lock(); String volumeName = args.getVolumeName(); @@ -289,15 +276,14 @@ public void commitKey(OmKeyArgs args, int clientID) throws IOException { String keyName = args.getKeyName(); try { validateBucket(volumeName, bucketName); - String objectKey = metadataManager.getKeyWithDBPrefix( + byte[] openKey = metadataManager.getOpenKeyBytes(volumeName, bucketName, + keyName, clientID); + byte[] objectKey = metadataManager.getOzoneKeyBytes( volumeName, bucketName, keyName); - byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName, - bucketName, keyName); - byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID); - byte[] openKeyData = metadataManager.get(openKey); + byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey); if (openKeyData == null) { throw new OMException("Commit a key without corresponding entry " + - DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND); + DFSUtil.bytes2String(objectKey), ResultCodes.FAILED_KEY_NOT_FOUND); } OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData)); @@ -305,12 +291,13 @@ public void commitKey(OmKeyArgs args, int clientID) throws IOException { keyInfo.setModificationTime(Time.now()); List locationInfoList = args.getLocationInfoList(); Preconditions.checkNotNull(locationInfoList); + //update the block length for each block keyInfo.updateLocationInfoList(locationInfoList); - BatchOperation batch = new BatchOperation(); - batch.delete(openKey); - batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray()); - metadataManager.writeBatch(batch); + metadataManager.getStore().move(openKey, objectKey, + keyInfo.getProtobuf().toByteArray(), + metadataManager.getOpenKeyTable(), + metadataManager.getKeyTable()); } catch (OMException e) { throw e; } catch (IOException ex) { @@ -331,9 +318,9 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { String bucketName = args.getBucketName(); String keyName = args.getKeyName(); try { - byte[] keyKey = metadataManager.getDBKeyBytes( + byte[] keyBytes = metadataManager.getOzoneKeyBytes( volumeName, bucketName, keyName); - byte[] value = metadataManager.get(keyKey); + byte[] value = metadataManager.getKeyTable().get(keyBytes); if (value == null) { LOG.debug("volume:{} bucket:{} Key:{} not found", volumeName, bucketName, keyName); @@ -341,7 +328,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { OMException.ResultCodes.FAILED_KEY_NOT_FOUND); } return OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value)); - } catch (DBException ex) { + } catch (IOException ex) { LOG.error("Get key failed for volume:{} bucket:{} key:{}", volumeName, bucketName, keyName, ex); throw new OMException(ex.getMessage(), @@ -368,9 +355,9 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { metadataManager.writeLock().lock(); try { // fromKeyName should exist - byte[] fromKey = metadataManager.getDBKeyBytes( + byte[] fromKey = metadataManager.getOzoneKeyBytes( volumeName, bucketName, fromKeyName); - byte[] fromKeyValue = metadataManager.get(fromKey); + byte[] fromKeyValue = metadataManager.getKeyTable().get(fromKey); if (fromKeyValue == null) { // TODO: Add support for renaming open key LOG.error( @@ -381,10 +368,20 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { OMException.ResultCodes.FAILED_KEY_NOT_FOUND); } + // A rename is a no-op if the target and source name is same. + // TODO: Discuss if we need to throw?. + // TODO: Define the semantics of rename more clearly. Today this code + // will allow rename of a Key across volumes. This should *not* be + // allowed. The documentation of Ozone says that rename is permitted only + // within a volume. + if (fromKeyName.equals(toKeyName)) { + return; + } + // toKeyName should not exist byte[] toKey = - metadataManager.getDBKeyBytes(volumeName, bucketName, toKeyName); - byte[] toKeyValue = metadataManager.get(toKey); + metadataManager.getOzoneKeyBytes(volumeName, bucketName, toKeyName); + byte[] toKeyValue = metadataManager.getKeyTable().get(toKey); if (toKeyValue != null) { LOG.error( "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. " @@ -394,19 +391,18 @@ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { OMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS); } - if (fromKeyName.equals(toKeyName)) { - return; - } OmKeyInfo newKeyInfo = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue)); newKeyInfo.setKeyName(toKeyName); newKeyInfo.updateModifcationTime(); - BatchOperation batch = new BatchOperation(); - batch.delete(fromKey); - batch.put(toKey, newKeyInfo.getProtobuf().toByteArray()); - metadataManager.writeBatch(batch); - } catch (DBException ex) { + try (WriteBatch batch = new WriteBatch()) { + batch.delete(metadataManager.getKeyTable().getHandle(), fromKey); + batch.put(metadataManager.getKeyTable().getHandle(), toKey, + newKeyInfo.getProtobuf().toByteArray()); + metadataManager.getStore().write(batch); + } + } catch (RocksDBException | IOException ex) { LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}", volumeName, bucketName, fromKeyName, toKeyName, ex); throw new OMException(ex.getMessage(), @@ -424,19 +420,19 @@ public void deleteKey(OmKeyArgs args) throws IOException { String bucketName = args.getBucketName(); String keyName = args.getKeyName(); try { - byte[] objectKey = metadataManager.getDBKeyBytes( + byte[] objectKey = metadataManager.getOzoneKeyBytes( volumeName, bucketName, keyName); - byte[] objectValue = metadataManager.get(objectKey); + byte[] objectValue = metadataManager.getKeyTable().get(objectKey); if (objectValue == null) { throw new OMException("Key not found", OMException.ResultCodes.FAILED_KEY_NOT_FOUND); } - byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey); - BatchOperation batch = new BatchOperation(); - batch.put(deletingKey, objectValue); - batch.delete(objectKey); - metadataManager.writeBatch(batch); - } catch (DBException ex) { + metadataManager.getStore().move(objectKey, + metadataManager.getKeyTable(), + metadataManager.getDeletedTable()); + } catch (OMException ex) { + throw ex; + } catch (IOException ex) { LOG.error(String.format("Delete key failed for volume:%s " + "bucket:%s key:%s", volumeName, bucketName, keyName), ex); throw new OMException(ex.getMessage(), ex, @@ -448,53 +444,30 @@ public void deleteKey(OmKeyArgs args) throws IOException { @Override public List listKeys(String volumeName, String bucketName, - String startKey, String keyPrefix, + String startKey, String keyPrefix, int maxKeys) throws IOException { Preconditions.checkNotNull(volumeName); Preconditions.checkNotNull(bucketName); - metadataManager.readLock().lock(); - try { - return metadataManager.listKeys(volumeName, bucketName, - startKey, keyPrefix, maxKeys); - } finally { - metadataManager.readLock().unlock(); - } + // We don't take a lock in this path, since we walk the + // underlying table using an iterator. That automatically creates a + // snapshot of the data, so we don't need these locks at a higher level + // when we iterate. + return metadataManager.listKeys(volumeName, bucketName, + startKey, keyPrefix, maxKeys); } @Override public List getPendingDeletionKeys(final int count) throws IOException { - metadataManager.readLock().lock(); - try { - return metadataManager.getPendingDeletionKeys(count); - } finally { - metadataManager.readLock().unlock(); - } + //TODO: Fix this in later patches. + return null; } @Override public void deletePendingDeletionKey(String objectKeyName) - throws IOException{ - Preconditions.checkNotNull(objectKeyName); - if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) { - throw new IllegalArgumentException("Invalid key name," - + " the name should be the key name with deleting prefix"); - } - - // Simply removes the entry from OM DB. - metadataManager.writeLock().lock(); - try { - byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName); - byte[] delKeyValue = metadataManager.get(pendingDelKey); - if (delKeyValue == null) { - throw new IOException("Failed to delete key " + objectKeyName - + " because it is not found in DB"); - } - metadataManager.delete(pendingDelKey); - } finally { - metadataManager.writeLock().unlock(); - } + throws IOException { + // TODO : Fix in later patches. } @Override @@ -510,23 +483,6 @@ public List getExpiredOpenKeys() throws IOException { @Override public void deleteExpiredOpenKey(String objectKeyName) throws IOException { Preconditions.checkNotNull(objectKeyName); - if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) { - throw new IllegalArgumentException("Invalid key name," - + " the name should be the key name with open key prefix"); - } - - // Simply removes the entry from OM DB. - metadataManager.writeLock().lock(); - try { - byte[] openKey = DFSUtil.string2Bytes(objectKeyName); - byte[] delKeyValue = metadataManager.get(openKey); - if (delKeyValue == null) { - throw new IOException("Failed to delete key " + objectKeyName - + " because it is not found in DB"); - } - metadataManager.delete(openKey); - } finally { - metadataManager.writeLock().unlock(); - } + // TODO: Fix this in later patches. } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index f2e78e661c..0e9ae42692 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -17,12 +17,12 @@ package org.apache.hadoop.ozone.om; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.db.DBStore; +import org.apache.hadoop.utils.db.Table; import java.io.IOException; import java.util.List; @@ -40,68 +40,47 @@ public interface OMMetadataManager { /** * Stop metadata manager. */ - void stop() throws IOException; + void stop() throws Exception; /** * Get metadata store. + * * @return metadata store. */ @VisibleForTesting - MetadataStore getStore(); + DBStore getStore(); /** * Returns the read lock used on Metadata DB. + * * @return readLock */ Lock readLock(); /** * Returns the write lock used on Metadata DB. + * * @return writeLock */ Lock writeLock(); - /** - * Returns the value associated with this key. - * @param key - key - * @return value - */ - byte[] get(byte[] key) throws IOException; - - /** - * Puts a Key into Metadata DB. - * @param key - key - * @param value - value - */ - void put(byte[] key, byte[] value) throws IOException; - - /** - * Deletes a Key from Metadata DB. - * @param key - key - */ - void delete(byte[] key) throws IOException; - - /** - * Atomic write a batch of operations. - * @param batch - * @throws IOException - */ - void writeBatch(BatchOperation batch) throws IOException; - /** * Given a volume return the corresponding DB key. + * * @param volume - Volume name */ byte[] getVolumeKey(String volume); /** * Given a user return the corresponding DB key. + * * @param user - User name */ byte[] getUserKey(String user); /** * Given a volume and bucket, return the corresponding DB key. + * * @param volume - User name * @param bucket - Bucket name */ @@ -109,131 +88,103 @@ public interface OMMetadataManager { /** * Given a volume, bucket and a key, return the corresponding DB key. + * * @param volume - volume name * @param bucket - bucket name * @param key - key name * @return bytes of DB key. */ - byte[] getDBKeyBytes(String volume, String bucket, String key); + byte[] getOzoneKeyBytes(String volume, String bucket, String key); /** - * Returns the DB key name of a deleted key in OM metadata store. - * The name for a deleted key has prefix #deleting# followed by - * the actual key name. - * @param keyName - key name - * @return bytes of DB key. - */ - byte[] getDeletedKeyName(byte[] keyName); - - /** - * Returns the DB key name of a open key in OM metadata store. - * Should be #open# prefix followed by actual key name. - * @param keyName - key name + * Returns the DB key name of a open key in OM metadata store. Should be + * #open# prefix followed by actual key name. + * + * @param volume - volume name + * @param bucket - bucket name + * @param key - key name * @param id - the id for this open * @return bytes of DB key. */ - byte[] getOpenKeyNameBytes(String keyName, int id); + byte[] getOpenKeyBytes(String volume, String bucket, String key, long id); /** - * Returns the full name of a key given volume name, bucket name and key name. - * Generally done by padding certain delimiters. + * Given a volume, check if it is empty, i.e there are no buckets inside it. * - * @param volumeName - volume name - * @param bucketName - bucket name - * @param keyName - key name - * @return the full key name. - */ - String getKeyWithDBPrefix(String volumeName, String bucketName, - String keyName); - - /** - * Given a volume, check if it is empty, - * i.e there are no buckets inside it. * @param volume - Volume name */ boolean isVolumeEmpty(String volume) throws IOException; /** - * Given a volume/bucket, check if it is empty, - * i.e there are no keys inside it. + * Given a volume/bucket, check if it is empty, i.e there are no keys inside + * it. + * * @param volume - Volume name - * @param bucket - Bucket name + * @param bucket - Bucket name * @return true if the bucket is empty */ boolean isBucketEmpty(String volume, String bucket) throws IOException; /** - * Returns a list of buckets represented by {@link OmBucketInfo} - * in the given volume. + * Returns a list of buckets represented by {@link OmBucketInfo} in the given + * volume. * - * @param volumeName - * the name of the volume. This argument is required, - * this method returns buckets in this given volume. - * @param startBucket - * the start bucket name. Only the buckets whose name is - * after this value will be included in the result. - * This key is excluded from the result. - * @param bucketPrefix - * bucket name prefix. Only the buckets whose name has - * this prefix will be included in the result. - * @param maxNumOfBuckets - * the maximum number of buckets to return. It ensures - * the size of the result will not exceed this limit. + * @param volumeName the name of the volume. This argument is required, this + * method returns buckets in this given volume. + * @param startBucket the start bucket name. Only the buckets whose name is + * after this value will be included in the result. This key is excluded from + * the result. + * @param bucketPrefix bucket name prefix. Only the buckets whose name has + * this prefix will be included in the result. + * @param maxNumOfBuckets the maximum number of buckets to return. It ensures + * the size of the result will not exceed this limit. * @return a list of buckets. * @throws IOException */ List listBuckets(String volumeName, String startBucket, - String bucketPrefix, int maxNumOfBuckets) throws IOException; + String bucketPrefix, int maxNumOfBuckets) + throws IOException; /** - * Returns a list of keys represented by {@link OmKeyInfo} - * in the given bucket. + * Returns a list of keys represented by {@link OmKeyInfo} in the given + * bucket. * - * @param volumeName - * the name of the volume. - * @param bucketName - * the name of the bucket. - * @param startKey - * the start key name, only the keys whose name is - * after this value will be included in the result. - * This key is excluded from the result. - * @param keyPrefix - * key name prefix, only the keys whose name has - * this prefix will be included in the result. - * @param maxKeys - * the maximum number of keys to return. It ensures - * the size of the result will not exceed this limit. + * @param volumeName the name of the volume. + * @param bucketName the name of the bucket. + * @param startKey the start key name, only the keys whose name is after this + * value will be included in the result. This key is excluded from the + * result. + * @param keyPrefix key name prefix, only the keys whose name has this prefix + * will be included in the result. + * @param maxKeys the maximum number of keys to return. It ensures the size of + * the result will not exceed this limit. * @return a list of keys. * @throws IOException */ List listKeys(String volumeName, - String bucketName, String startKey, String keyPrefix, int maxKeys) + String bucketName, String startKey, String keyPrefix, int maxKeys) throws IOException; /** - * Returns a list of volumes owned by a given user; if user is null, - * returns all volumes. + * Returns a list of volumes owned by a given user; if user is null, returns + * all volumes. * - * @param userName - * volume owner - * @param prefix - * the volume prefix used to filter the listing result. - * @param startKey - * the start volume name determines where to start listing from, - * this key is excluded from the result. - * @param maxKeys - * the maximum number of volumes to return. + * @param userName volume owner + * @param prefix the volume prefix used to filter the listing result. + * @param startKey the start volume name determines where to start listing + * from, this key is excluded from the result. + * @param maxKeys the maximum number of volumes to return. * @return a list of {@link OmVolumeArgs} * @throws IOException */ List listVolumes(String userName, String prefix, - String startKey, int maxKeys) throws IOException; + String startKey, int maxKeys) throws IOException; /** * Returns a list of pending deletion key info that ups to the given count. - * Each entry is a {@link BlockGroup}, which contains the info about the - * key name and all its associated block IDs. A pending deletion key is - * stored with #deleting# prefix in OM DB. + * Each entry is a {@link BlockGroup}, which contains the info about the key + * name and all its associated block IDs. A pending deletion key is stored + * with #deleting# prefix in OM DB. * * @param count max number of keys to return. * @return a list of {@link BlockGroup} represent keys and blocks. @@ -250,4 +201,47 @@ List listVolumes(String userName, String prefix, * @throws IOException */ List getExpiredOpenKeys() throws IOException; + + /** + * Returns the user Table. + * + * @return UserTable. + */ + Table getUserTable(); + + /** + * Returns the Volume Table. + * + * @return VolumeTable. + */ + Table getVolumeTable(); + + /** + * Returns the BucketTable. + * + * @return BucketTable. + */ + Table getBucketTable(); + + /** + * Returns the KeyTable. + * + * @return KeyTable. + */ + Table getKeyTable(); + + /** + * Get Deleted Table. + * + * @return Deleted Table. + */ + Table getDeletedTable(); + + /** + * Gets the OpenKeyTable. + * + * @return Table. + */ + Table getOpenKeyTable(); + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 21d24114c8..151fddf1cc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -19,77 +19,178 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; -import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; - import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.apache.hadoop.utils.db.DBStore; +import org.apache.hadoop.utils.db.DBStoreBuilder; +import org.apache.hadoop.utils.db.Table; +import org.apache.hadoop.utils.db.TableIterator; +import org.eclipse.jetty.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.ArrayList; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; -import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR; -import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_DB_CACHE_SIZE_MB; -import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; /** * Ozone metadata manager interface. */ public class OmMetadataManagerImpl implements OMMetadataManager { + private static final Logger LOG = + LoggerFactory.getLogger(OmMetadataManagerImpl.class); - private final MetadataStore store; + /** + * OM RocksDB Structure . + *

+ * OM DB stores metadata as KV pairs in different column families. + *

+ * OM DB Schema: + * |-------------------------------------------------------------------| + * | Column Family | VALUE | + * |-------------------------------------------------------------------| + * | userTable | user->VolumeList | + * |-------------------------------------------------------------------| + * | volumeTable | /volume->VolumeInfo | + * |-------------------------------------------------------------------| + * | bucketTable | /volume/bucket-> BucketInfo | + * |-------------------------------------------------------------------| + * | keyTable | /volumeName/bucketName/keyName->KeyInfo | + * |-------------------------------------------------------------------| + * | deletedTable | /volumeName/bucketName/keyName->KeyInfo | + * |-------------------------------------------------------------------| + * | openKey | /volumeName/bucketName/keyName/id->KeyInfo | + * |-------------------------------------------------------------------| + */ + + private static final String USER_TABLE = "userTable"; + private static final String VOLUME_TABLE = "volumeTable"; + private static final String BUCKET_TABLE = "bucketTable"; + private static final String KEY_TABLE = "keyTable"; + private static final String DELETED_TABLE = "deletedTable"; + private static final String OPEN_KEY_TABLE = "openKeyTable"; + + private final DBStore store; + + // TODO: Make this lock move into Table instead of *ONE* lock for the whole + // DB. private final ReadWriteLock lock; private final long openKeyExpireThresholdMS; + private final Table userTable; + private final Table volumeTable; + private final Table bucketTable; + private final Table keyTable; + private final Table deletedTable; + private final Table openKeyTable; + public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException { File metaDir = getOzoneMetaDirPath(conf); - final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB, - OZONE_OM_DB_CACHE_SIZE_DEFAULT); - File omDBFile = new File(metaDir.getPath(), OM_DB_NAME); - this.store = MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(omDBFile) - .setCacheSize(cacheSize * OzoneConsts.MB) - .build(); this.lock = new ReentrantReadWriteLock(); this.openKeyExpireThresholdMS = 1000 * conf.getInt( OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT); + + this.store = DBStoreBuilder.newBuilder(conf) + .setName(OM_DB_NAME) + .setPath(Paths.get(metaDir.getPath())) + .addTable(USER_TABLE) + .addTable(VOLUME_TABLE) + .addTable(BUCKET_TABLE) + .addTable(KEY_TABLE) + .addTable(DELETED_TABLE) + .addTable(OPEN_KEY_TABLE) + .build(); + + userTable = this.store.getTable(USER_TABLE); + checkTableStatus(userTable, USER_TABLE); + + volumeTable = this.store.getTable(VOLUME_TABLE); + checkTableStatus(volumeTable, VOLUME_TABLE); + + bucketTable = this.store.getTable(BUCKET_TABLE); + checkTableStatus(bucketTable, BUCKET_TABLE); + + keyTable = this.store.getTable(KEY_TABLE); + checkTableStatus(keyTable, KEY_TABLE); + + deletedTable = this.store.getTable(DELETED_TABLE); + checkTableStatus(deletedTable, DELETED_TABLE); + + openKeyTable = this.store.getTable(OPEN_KEY_TABLE); + checkTableStatus(openKeyTable, OPEN_KEY_TABLE); + + } + + @Override + public Table getUserTable() { + return userTable; + } + + @Override + public Table getVolumeTable() { + return volumeTable; + } + + @Override + public Table getBucketTable() { + return bucketTable; + } + + @Override + public Table getKeyTable() { + return keyTable; + } + + @Override + public Table getDeletedTable() { + return deletedTable; + } + + @Override + public Table getOpenKeyTable() { + return openKeyTable; + } + + private void checkTableStatus(Table table, String name) throws IOException { + String logMessage = "Unable to get a reference to %s table. Cannot " + + "continue."; + String errMsg = "Inconsistent DB state, Table - %s. Please check the logs" + + "for more info."; + if (table == null) { + LOG.error(String.format(logMessage, name)); + throw new IOException(String.format(errMsg, name)); + } } /** @@ -104,7 +205,7 @@ public void start() { * Stop metadata manager. */ @Override - public void stop() throws IOException { + public void stop() throws Exception { if (store != null) { store.close(); } @@ -112,86 +213,75 @@ public void stop() throws IOException { /** * Get metadata store. + * * @return store - metadata store. */ @VisibleForTesting @Override - public MetadataStore getStore() { + public DBStore getStore() { return store; } /** * Given a volume return the corresponding DB key. + * * @param volume - Volume name */ + @Override public byte[] getVolumeKey(String volume) { - String dbVolumeName = OzoneConsts.OM_VOLUME_PREFIX + volume; - return DFSUtil.string2Bytes(dbVolumeName); + return DFSUtil.string2Bytes(OzoneConsts.OM_KEY_PREFIX + volume); } /** * Given a user return the corresponding DB key. + * * @param user - User name */ + @Override public byte[] getUserKey(String user) { - String dbUserName = OzoneConsts.OM_USER_PREFIX + user; - return DFSUtil.string2Bytes(dbUserName); + return DFSUtil.string2Bytes(user); } /** * Given a volume and bucket, return the corresponding DB key. + * * @param volume - User name * @param bucket - Bucket name */ + @Override public byte[] getBucketKey(String volume, String bucket) { - String bucketKeyString = OzoneConsts.OM_VOLUME_PREFIX + volume - + OzoneConsts.OM_BUCKET_PREFIX + bucket; - return DFSUtil.string2Bytes(bucketKeyString); - } + StringBuilder builder = + new StringBuilder().append(OM_KEY_PREFIX).append(volume); - /** - * @param volume - * @param bucket - * @return - */ - private String getBucketWithDBPrefix(String volume, String bucket) { - StringBuffer sb = new StringBuffer(); - sb.append(OzoneConsts.OM_VOLUME_PREFIX) - .append(volume) - .append(OzoneConsts.OM_BUCKET_PREFIX); - if (!Strings.isNullOrEmpty(bucket)) { - sb.append(bucket); + if (StringUtils.isNotBlank(bucket)) { + builder.append(OM_KEY_PREFIX).append(bucket); } - return sb.toString(); + return DFSUtil.string2Bytes(builder.toString()); } @Override - public String getKeyWithDBPrefix(String volume, String bucket, String key) { - String keyVB = OzoneConsts.OM_KEY_PREFIX + volume - + OzoneConsts.OM_KEY_PREFIX + bucket - + OzoneConsts.OM_KEY_PREFIX; - return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key; + public byte[] getOzoneKeyBytes(String volume, String bucket, String key) { + StringBuilder builder = new StringBuilder() + .append(OM_KEY_PREFIX).append(volume); + // TODO : Throw if the Bucket is null? + builder.append(OM_KEY_PREFIX).append(bucket); + if (StringUtil.isNotBlank(key)) { + builder.append(OM_KEY_PREFIX).append(key); + } + return DFSUtil.string2Bytes(builder.toString()); } @Override - public byte[] getDBKeyBytes(String volume, String bucket, String key) { - return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key)); - } - - @Override - public byte[] getDeletedKeyName(byte[] keyName) { - return DFSUtil.string2Bytes( - DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName)); - } - - @Override - public byte[] getOpenKeyNameBytes(String keyName, int id) { - return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id + - OPEN_KEY_ID_DELIMINATOR + keyName); + public byte[] getOpenKeyBytes(String volume, String bucket, + String key, long id) { + String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket + + OM_KEY_PREFIX + key + OM_KEY_PREFIX + id; + return DFSUtil.string2Bytes(openKey); } /** * Returns the read lock used on Metadata DB. + * * @return readLock */ @Override @@ -201,6 +291,7 @@ public Lock readLock() { /** * Returns the write lock used on Metadata DB. + * * @return writeLock */ @Override @@ -209,71 +300,79 @@ public Lock writeLock() { } /** - * Returns the value associated with this key. - * @param key - key - * @return value + * Returns true if the firstArray startsWith the bytes of secondArray. + * + * @param firstArray - Byte array + * @param secondArray - Byte array + * @return true if the first array bytes match the bytes in the second array. */ - @Override - public byte[] get(byte[] key) throws IOException { - return store.get(key); - } + private boolean startsWith(byte[] firstArray, byte[] secondArray) { - /** - * Puts a Key into Metadata DB. - * @param key - key - * @param value - value - */ - @Override - public void put(byte[] key, byte[] value) throws IOException { - store.put(key, value); - } + if (firstArray == null) { + // if both are null, then the arrays match, else if first is null and + // second is not, then this function returns false. + return secondArray == null; + } - /** - * Deletes a Key from Metadata DB. - * @param key - key - */ - public void delete(byte[] key) throws IOException { - store.delete(key); - } - @Override - public void writeBatch(BatchOperation batch) throws IOException { - this.store.writeBatch(batch); + if (secondArray != null) { + // If the second array is longer then first array cannot be starting with + // the bytes of second array. + if (secondArray.length > firstArray.length) { + return false; + } + + for (int ndx = 0; ndx < secondArray.length; ndx++) { + if (firstArray[ndx] != secondArray[ndx]) { + return false; + } + } + return true; //match, return true. + } + return false; // if first is not null and second is null, we define that + // array does not start with same chars. } /** * Given a volume, check if it is empty, i.e there are no buckets inside it. + * We iterate in the bucket table and see if there is any key that starts with + * the volume prefix. We actually look for /volume/, since if we don't have + * the trailing slash it is possible that we might match some other volume. + *

+ * For example, vol1 and vol122 might match, to avoid that we look for /vol1/ + * * @param volume - Volume name * @return true if the volume is empty */ + @Override public boolean isVolumeEmpty(String volume) throws IOException { - String dbVolumeRootName = OzoneConsts.OM_VOLUME_PREFIX + volume - + OzoneConsts.OM_BUCKET_PREFIX; - byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName); - ImmutablePair volumeRoot = - store.peekAround(0, dbVolumeRootKey); - if (volumeRoot != null) { - return !DFSUtil.bytes2String(volumeRoot.getKey()) - .startsWith(dbVolumeRootName); + byte[] volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX); + try (TableIterator bucketIter = bucketTable.iterator()) { + Table.KeyValue kv = bucketIter.seek(volumePrefix); + if (kv != null && startsWith(kv.getKey(), volumePrefix)) { + return false; // we found at least one bucket with this volume prefix. + } } return true; } /** - * Given a volume/bucket, check if it is empty, - * i.e there are no keys inside it. + * Given a volume/bucket, check if it is empty, i.e there are no keys inside + * it. Prefix is /volume/bucket/, and we lookup the keyTable. + * * @param volume - Volume name * @param bucket - Bucket name * @return true if the bucket is empty */ + @Override public boolean isBucketEmpty(String volume, String bucket) throws IOException { - String keyRootName = getKeyWithDBPrefix(volume, bucket, null); - byte[] keyRoot = DFSUtil.string2Bytes(keyRootName); - ImmutablePair firstKey = store.peekAround(0, keyRoot); - if (firstKey != null) { - return !DFSUtil.bytes2String(firstKey.getKey()) - .startsWith(keyRootName); + byte[] keyPrefix = getBucketKey(volume, bucket + OM_KEY_PREFIX); + try (TableIterator keyIter = keyTable.iterator()) { + Table.KeyValue kv = keyIter.seek(keyPrefix); + if (kv != null && startsWith(kv.getKey(), keyPrefix)) { + return false; // we found at least one key with this vol/bucket prefix. + } } return true; } @@ -283,8 +382,8 @@ public boolean isBucketEmpty(String volume, String bucket) */ @Override public List listBuckets(final String volumeName, - final String startBucket, final String bucketPrefix, - final int maxNumOfBuckets) throws IOException { + final String startBucket, final String bucketPrefix, + final int maxNumOfBuckets) throws IOException { List result = new ArrayList<>(); if (Strings.isNullOrEmpty(volumeName)) { throw new OMException("Volume name is required.", @@ -292,49 +391,61 @@ public List listBuckets(final String volumeName, } byte[] volumeNameBytes = getVolumeKey(volumeName); - if (store.get(volumeNameBytes) == null) { + if (volumeTable.get(volumeNameBytes) == null) { throw new OMException("Volume " + volumeName + " not found.", ResultCodes.FAILED_VOLUME_NOT_FOUND); } - // A bucket starts with /#volume/#bucket_prefix - MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> { - if (currentKey != null) { - String bucketNamePrefix = - getBucketWithDBPrefix(volumeName, bucketPrefix); - String bucket = DFSUtil.bytes2String(currentKey); - return bucket.startsWith(bucketNamePrefix); - } - return false; - }; - - List> rangeResult; - if (!Strings.isNullOrEmpty(startBucket)) { - // Since we are excluding start key from the result, - // the maxNumOfBuckets is incremented. - rangeResult = store.getSequentialRangeKVs( - getBucketKey(volumeName, startBucket), - maxNumOfBuckets + 1, filter); - if (!rangeResult.isEmpty()) { - //Remove start key from result. - rangeResult.remove(0); - } + byte[] startKey; + boolean skipStartKey = false; + if (StringUtil.isNotBlank(startBucket)) { + // if the user has specified a start key, we need to seek to that key + // and avoid that key in the response set. + startKey = getBucketKey(volumeName, startBucket); + skipStartKey = true; } else { - rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter); + // If the user has specified a prefix key, we need to get to the first + // of the keys with the prefix match. We can leverage RocksDB to do that. + // However, if the user has specified only a prefix, we cannot skip + // the first prefix key we see, the boolean skipStartKey allows us to + // skip the startkey or not depending on what patterns are specified. + startKey = getBucketKey(volumeName, bucketPrefix); } - for (Map.Entry entry : rangeResult) { - OmBucketInfo info = OmBucketInfo.getFromProtobuf( - BucketInfo.parseFrom(entry.getValue())); - result.add(info); + byte[] seekPrefix; + if (StringUtil.isNotBlank(bucketPrefix)) { + seekPrefix = getBucketKey(volumeName, bucketPrefix); + } else { + seekPrefix = getVolumeKey(volumeName + OM_KEY_PREFIX); + } + int currentCount = 0; + try (TableIterator bucketIter = bucketTable.iterator()) { + Table.KeyValue kv = bucketIter.seek(startKey); + while (currentCount < maxNumOfBuckets && bucketIter.hasNext()) { + kv = bucketIter.next(); + // Skip the Start Bucket if needed. + if (kv != null && skipStartKey && + Arrays.equals(kv.getKey(), startKey)) { + continue; + } + if (kv != null && startsWith(kv.getKey(), seekPrefix)) { + result.add(OmBucketInfo.getFromProtobuf( + BucketInfo.parseFrom(kv.getValue()))); + currentCount++; + } else { + // The SeekPrefix does not match any more, we can break out of the + // loop. + break; + } + } } return result; } @Override public List listKeys(String volumeName, String bucketName, - String startKey, String keyPrefix, int maxKeys) throws IOException { + String startKey, String keyPrefix, int maxKeys) throws IOException { List result = new ArrayList<>(); if (Strings.isNullOrEmpty(volumeName)) { throw new OMException("Volume name is required.", @@ -347,47 +458,61 @@ public List listKeys(String volumeName, String bucketName, } byte[] bucketNameBytes = getBucketKey(volumeName, bucketName); - if (store.get(bucketNameBytes) == null) { + if (getBucketTable().get(bucketNameBytes) == null) { throw new OMException("Bucket " + bucketName + " not found.", ResultCodes.FAILED_BUCKET_NOT_FOUND); } - MetadataKeyFilter filter = new KeyPrefixFilter() - .addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix)); - - List> rangeResult; - if (!Strings.isNullOrEmpty(startKey)) { - //Since we are excluding start key from the result, - // the maxNumOfBuckets is incremented. - rangeResult = store.getSequentialRangeKVs( - getDBKeyBytes(volumeName, bucketName, startKey), - maxKeys + 1, filter); - if (!rangeResult.isEmpty()) { - //Remove start key from result. - rangeResult.remove(0); - } + byte[] seekKey; + boolean skipStartKey = false; + if (StringUtil.isNotBlank(startKey)) { + // Seek to the specified key. + seekKey = getOzoneKeyBytes(volumeName, bucketName, startKey); + skipStartKey = true; } else { - rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter); + // This allows us to seek directly to the first key with the right prefix. + seekKey = getOzoneKeyBytes(volumeName, bucketName, keyPrefix); } - for (Map.Entry entry : rangeResult) { - OmKeyInfo info = OmKeyInfo.getFromProtobuf( - KeyInfo.parseFrom(entry.getValue())); - result.add(info); + byte[] seekPrefix; + if (StringUtil.isNotBlank(keyPrefix)) { + seekPrefix = getOzoneKeyBytes(volumeName, bucketName, keyPrefix); + } else { + seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX); + } + int currentCount = 0; + try (TableIterator keyIter = getKeyTable().iterator()) { + Table.KeyValue kv = keyIter.seek(seekKey); + while (currentCount < maxKeys && keyIter.hasNext()) { + kv = keyIter.next(); + // Skip the Start key if needed. + if (kv != null && skipStartKey && Arrays.equals(kv.getKey(), seekKey)) { + continue; + } + if (kv != null && startsWith(kv.getKey(), seekPrefix)) { + result.add(OmKeyInfo.getFromProtobuf( + KeyInfo.parseFrom(kv.getValue()))); + currentCount++; + } else { + // The SeekPrefix does not match any more, we can break out of the + // loop. + break; + } + } } return result; } @Override public List listVolumes(String userName, - String prefix, String startKey, int maxKeys) throws IOException { + String prefix, String startKey, int maxKeys) throws IOException { List result = Lists.newArrayList(); VolumeList volumes; - if (Strings.isNullOrEmpty(userName)) { - volumes = getAllVolumes(); - } else { - volumes = getVolumesByUser(userName); + if (StringUtil.isBlank(userName)) { + throw new OMException("User name is required to list Volumes.", + ResultCodes.FAILED_USER_NOT_FOUND); } + volumes = getVolumesByUser(userName); if (volumes == null || volumes.getVolumeNamesCount() == 0) { return result; @@ -406,7 +531,7 @@ public List listVolumes(String userName, continue; } if (startKeyFound && result.size() < maxKeys) { - byte[] volumeInfo = store.get(this.getVolumeKey(volumeName)); + byte[] volumeInfo = getVolumeTable().get(this.getVolumeKey(volumeName)); if (volumeInfo == null) { // Could not get volume info by given volume name, // since the volume name is loaded from db, @@ -433,7 +558,7 @@ private VolumeList getVolumesByUser(byte[] userNameKey) throws OMException { VolumeList volumes = null; try { - byte[] volumesInBytes = store.get(userNameKey); + byte[] volumesInBytes = getUserTable().get(userNameKey); if (volumesInBytes == null) { // No volume found for this user, return an empty list return VolumeList.newBuilder().build(); @@ -447,32 +572,12 @@ private VolumeList getVolumesByUser(byte[] userNameKey) return volumes; } - private VolumeList getAllVolumes() throws IOException { - // Scan all users in database - KeyPrefixFilter filter = - new KeyPrefixFilter().addFilter(OzoneConsts.OM_USER_PREFIX); - // We are not expecting a huge number of users per cluster, - // it should be fine to scan all users in db and return us a - // list of volume names in string per user. - List> rangeKVs = store - .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter); - - VolumeList.Builder builder = VolumeList.newBuilder(); - for (Map.Entry entry : rangeKVs) { - VolumeList volumes = this.getVolumesByUser(entry.getKey()); - builder.addAllVolumeNames(volumes.getVolumeNamesList()); - } - - return builder.build(); - } - @Override public List getPendingDeletionKeys(final int count) throws IOException { List keyBlocksList = Lists.newArrayList(); - List> rangeResult = - store.getRangeKVs(null, count, - MetadataKeyFilters.getDeletingKeyFilter()); + // TODO: Fix this later, Not part of this patch. + List> rangeResult = Collections.emptyList(); for (Map.Entry entry : rangeResult) { OmKeyInfo info = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); @@ -482,7 +587,7 @@ public List getPendingDeletionKeys(final int count) return Collections.emptyList(); } List item = latest.getLocationList().stream() - .map(b->new BlockID(b.getContainerID(), b.getLocalID())) + .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) .collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder() .setKeyName(DFSUtil.bytes2String(entry.getKey())) @@ -497,11 +602,9 @@ public List getPendingDeletionKeys(final int count) public List getExpiredOpenKeys() throws IOException { List keyBlocksList = Lists.newArrayList(); long now = Time.now(); - final MetadataKeyFilter openKeyFilter = - new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX); - List> rangeResult = - store.getSequentialRangeKVs(null, Integer.MAX_VALUE, - openKeyFilter); + // TODO: Fix the getExpiredOpenKeys, Not part of this patch. + List> rangeResult = Collections.emptyList(); + for (Map.Entry entry : rangeResult) { OmKeyInfo info = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); @@ -513,7 +616,7 @@ public List getExpiredOpenKeys() throws IOException { // Get block keys as a list. List item = info.getLatestVersionLocations() .getBlocksLatestVersionOnly().stream() - .map(b->new BlockID(b.getContainerID(), b.getLocalID())) + .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) .collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder() .setKeyName(DFSUtil.bytes2String(entry.getKey())) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 71fa921cc5..c06508d5df 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -21,14 +21,27 @@ import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -39,36 +52,12 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; -import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .ServicePort; -import org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.OzoneAclInfo; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.hdds.scm.protocolPB - .ScmBlockLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; - -import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients; -import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; -import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled; -import static org.apache.hadoop.ozone.OmUtils.getOmAddress; -import static org.apache.hadoop.hdds.server.ServerUtils - .updateRPCListenAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,18 +70,17 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients; +import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; +import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; +import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; +import static org.apache.hadoop.ozone.OmUtils.getOmAddress; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.ozone.om.OMConfigKeys - .OZONE_OM_HANDLER_COUNT_KEY; -import static org.apache.hadoop.ozone.protocol.proto - .OzoneManagerProtocolProtos.OzoneManagerService - .newReflectiveBlockingService; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos - .NodeState.HEALTHY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService; import static org.apache.hadoop.util.ExitUtil.terminate; /** @@ -108,33 +96,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl "Usage: \n ozone om [genericOptions] " + "[ " + StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "ozone om [ " + StartupOption.HELP.getName() + " ]\n"; - - /** Startup options. */ - public enum StartupOption { - CREATEOBJECTSTORE("-createObjectStore"), - HELP("-help"), - REGULAR("-regular"); - - private final String name; - - StartupOption(String arg) { - this.name = arg; - } - - public String getName() { - return name; - } - - public static StartupOption parse(String value) { - for (StartupOption option : StartupOption.values()) { - if (option.name.equalsIgnoreCase(value)) { - return option; - } - } - return null; - } - } - private final OzoneConfiguration configuration; private final RPC.Server omRpcServer; private final InetSocketAddress omRpcAddress; @@ -238,20 +199,6 @@ private static StorageContainerLocationProtocol getScmContainerClient( return scmContainerClient; } - @VisibleForTesting - public KeyManager getKeyManager() { - return keyManager; - } - - @VisibleForTesting - public ScmInfo getScmInfo() throws IOException { - return scmBlockClient.getScmInfo(); - } - - @VisibleForTesting - public OMStorage getOmStorage() { - return omStorage; - } /** * Starts an RPC server, if configured. * @@ -260,7 +207,6 @@ public OMStorage getOmStorage() { * @param protocol RPC protocol provided by RPC server * @param instance RPC protocol implementation instance * @param handlerCount RPC server handler count - * * @return RPC server * @throws IOException if there is an I/O error while creating RPC server */ @@ -281,18 +227,6 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf, return rpcServer; } - /** - * Get metadata manager. - * @return metadata manager. - */ - public OMMetadataManager getMetadataManager() { - return metadataManager; - } - - public OMMetrics getMetrics() { - return metrics; - } - /** * Main entry point for starting OzoneManager. * @@ -329,6 +263,7 @@ private static void printUsage(PrintStream out) { /** * Constructs OM instance based on command line arguments. + * * @param argv Command line arguments * @param conf OzoneConfiguration * @return OM instance @@ -336,7 +271,7 @@ private static void printUsage(PrintStream out) { */ public static OzoneManager createOm(String[] argv, - OzoneConfiguration conf) throws IOException { + OzoneConfiguration conf) throws IOException { if (!isHddsEnabled(conf)) { System.err.println("OM cannot be started in secure mode or when " + OZONE_ENABLED + " is set to false"); @@ -363,9 +298,11 @@ public static OzoneManager createOm(String[] argv, /** * Initializes the OM instance. + * * @param conf OzoneConfiguration * @return true if OM initialization succeeds, false otherwise - * @throws IOException in case ozone metadata directory path is not accessible + * @throws IOException in case ozone metadata directory path is not + * accessible */ private static boolean omInit(OzoneConfiguration conf) throws IOException { @@ -406,14 +343,17 @@ private static boolean omInit(OzoneConfiguration conf) throws IOException { /** * Parses the command line options for OM initialization. + * * @param args command line arguments * @return StartupOption if options are valid, null otherwise */ private static StartupOption parseArguments(String[] args) { if (args == null || args.length == 0) { return StartupOption.REGULAR; - } else if (args.length == 1) { - return StartupOption.parse(args[0]); + } else { + if (args.length == 1) { + return StartupOption.parse(args[0]); + } } return null; } @@ -432,6 +372,34 @@ private static String buildRpcServerStartMessage(String description, String.format("%s not started", description); } + @VisibleForTesting + public KeyManager getKeyManager() { + return keyManager; + } + + @VisibleForTesting + public ScmInfo getScmInfo() throws IOException { + return scmBlockClient.getScmInfo(); + } + + @VisibleForTesting + public OMStorage getOmStorage() { + return omStorage; + } + + /** + * Get metadata manager. + * + * @return metadata manager. + */ + public OMMetadataManager getMetadataManager() { + return metadataManager; + } + + public OMMetrics getMetrics() { + return metrics; + } + /** * Start service. */ @@ -533,8 +501,8 @@ public void setQuota(String volume, long quota) throws IOException { * * @param volume - volume * @param userAcl - user acls which needs to be checked for access - * @return true if the user has required access for the volume, - * false otherwise + * @return true if the user has required access for the volume, false + * otherwise * @throws IOException */ @Override @@ -597,7 +565,7 @@ public void deleteVolume(String volume) throws IOException { */ @Override public List listVolumeByUser(String userName, String prefix, - String prevKey, int maxKeys) throws IOException { + String prevKey, int maxKeys) throws IOException { try { metrics.incNumVolumeLists(); return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys); @@ -651,7 +619,7 @@ public void createBucket(OmBucketInfo bucketInfo) throws IOException { */ @Override public List listBuckets(String volumeName, - String startKey, String prefix, int maxNumOfBuckets) + String startKey, String prefix, int maxNumOfBuckets) throws IOException { try { metrics.incNumBucketLists(); @@ -702,7 +670,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } @Override - public void commitKey(OmKeyArgs args, int clientID) + public void commitKey(OmKeyArgs args, long clientID) throws IOException { try { metrics.incNumKeyCommits(); @@ -714,7 +682,7 @@ public void commitKey(OmKeyArgs args, int clientID) } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) throws IOException { try { metrics.incNumBlockAllocateCalls(); @@ -773,7 +741,7 @@ public void deleteKey(OmKeyArgs args) throws IOException { @Override public List listKeys(String volumeName, String bucketName, - String startKey, String keyPrefix, int maxKeys) throws IOException { + String startKey, String keyPrefix, int maxKeys) throws IOException { try { metrics.incNumKeyLists(); return keyManager.listKeys(volumeName, bucketName, @@ -786,6 +754,7 @@ public List listKeys(String volumeName, String bucketName, /** * Sets bucket property from args. + * * @param args - BucketArgs. * @throws IOException */ @@ -801,9 +770,9 @@ public void setBucketProperty(OmBucketArgs args) } } - /** * Deletes an existing empty bucket from volume. + * * @param volume - Name of the volume. * @param bucket - Name of the bucket. * @throws IOException @@ -853,8 +822,8 @@ public List getServiceList() throws IOException { .setNodeType(HddsProtos.NodeType.OM) .setHostname(omRpcAddress.getHostName()) .addServicePort(ServicePort.newBuilder() - .setType(ServicePort.Type.RPC) - .setValue(omRpcAddress.getPort()) + .setType(ServicePort.Type.RPC) + .setValue(omRpcAddress.getPort()) .build()); if (httpServer.getHttpAddress() != null) { omServiceInfoBuilder.addServicePort(ServicePort.newBuilder() @@ -908,4 +877,32 @@ public List getServiceList() throws IOException { // metrics.incNumGetServiceListFails() return services; } + + /** + * Startup options. + */ + public enum StartupOption { + CREATEOBJECTSTORE("-createObjectStore"), + HELP("-help"), + REGULAR("-regular"); + + private final String name; + + StartupOption(String arg) { + this.name = arg; + } + + public static StartupOption parse(String value) { + for (StartupOption option : StartupOption.values()) { + if (option.name.equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + public String getName() { + return name; + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java index e50145debd..419b0aaf18 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java @@ -28,7 +28,9 @@ .OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.RocksDBStore; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,10 +69,10 @@ public VolumeManagerImpl(OMMetadataManager metadataManager, // Helpers to add and delete volume from user list private void addVolumeToOwnerList(String volume, String owner, - BatchOperation batchOperation) throws IOException { + WriteBatch batchOperation) throws RocksDBException, IOException { // Get the volume list byte[] dbUserKey = metadataManager.getUserKey(owner); - byte[] volumeList = metadataManager.get(dbUserKey); + byte[] volumeList = metadataManager.getUserTable().get(dbUserKey); List prevVolList = new LinkedList<>(); if (volumeList != null) { VolumeList vlist = VolumeList.parseFrom(volumeList); @@ -87,15 +89,15 @@ private void addVolumeToOwnerList(String volume, String owner, prevVolList.add(volume); VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - batchOperation.put(dbUserKey, newVolList.toByteArray()); + batchOperation.put(metadataManager.getUserTable().getHandle(), + dbUserKey, newVolList.toByteArray()); } private void delVolumeFromOwnerList(String volume, String owner, - BatchOperation batchOperation) - throws IOException { + WriteBatch batch) throws RocksDBException, IOException { // Get the volume list byte[] dbUserKey = metadataManager.getUserKey(owner); - byte[] volumeList = metadataManager.get(dbUserKey); + byte[] volumeList = metadataManager.getUserTable().get(dbUserKey); List prevVolList = new LinkedList<>(); if (volumeList != null) { VolumeList vlist = VolumeList.parseFrom(volumeList); @@ -108,11 +110,12 @@ private void delVolumeFromOwnerList(String volume, String owner, // Remove the volume from the list prevVolList.remove(volume); if (prevVolList.size() == 0) { - batchOperation.delete(dbUserKey); + batch.delete(dbUserKey); } else { VolumeList newVolList = VolumeList.newBuilder() .addAllVolumeNames(prevVolList).build(); - batchOperation.put(dbUserKey, newVolList.toByteArray()); + batch.put(metadataManager.getUserTable().getHandle(), + dbUserKey, newVolList.toByteArray()); } } @@ -126,7 +129,7 @@ public void createVolume(OmVolumeArgs args) throws IOException { metadataManager.writeLock().lock(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume()); - byte[] volumeInfo = metadataManager.get(dbVolumeKey); + byte[] volumeInfo = metadataManager.getVolumeTable().get(dbVolumeKey); // Check of the volume already exists if (volumeInfo != null) { @@ -134,37 +137,45 @@ public void createVolume(OmVolumeArgs args) throws IOException { throw new OMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS); } - BatchOperation batch = new BatchOperation(); - // Write the vol info - List metadataList = new LinkedList<>(); - for (Map.Entry entry : args.getKeyValueMap().entrySet()) { - metadataList.add(HddsProtos.KeyValue.newBuilder() - .setKey(entry.getKey()).setValue(entry.getValue()).build()); + try(WriteBatch batch = new WriteBatch()) { + // Write the vol info + List metadataList = new LinkedList<>(); + for (Map.Entry entry : + args.getKeyValueMap().entrySet()) { + metadataList.add(HddsProtos.KeyValue.newBuilder() + .setKey(entry.getKey()).setValue(entry.getValue()).build()); + } + List aclList = args.getAclMap().ozoneAclGetProtobuf(); + + VolumeInfo newVolumeInfo = VolumeInfo.newBuilder() + .setAdminName(args.getAdminName()) + .setOwnerName(args.getOwnerName()) + .setVolume(args.getVolume()) + .setQuotaInBytes(args.getQuotaInBytes()) + .addAllMetadata(metadataList) + .addAllVolumeAcls(aclList) + .setCreationTime(Time.now()) + .build(); + batch.put(metadataManager.getVolumeTable().getHandle(), + dbVolumeKey, newVolumeInfo.toByteArray()); + + // Add volume to user list + addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); + metadataManager.getStore().write(batch); } - List aclList = args.getAclMap().ozoneAclGetProtobuf(); - - VolumeInfo newVolumeInfo = VolumeInfo.newBuilder() - .setAdminName(args.getAdminName()) - .setOwnerName(args.getOwnerName()) - .setVolume(args.getVolume()) - .setQuotaInBytes(args.getQuotaInBytes()) - .addAllMetadata(metadataList) - .addAllVolumeAcls(aclList) - .setCreationTime(Time.now()) - .build(); - batch.put(dbVolumeKey, newVolumeInfo.toByteArray()); - - // Add volume to user list - addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); - metadataManager.writeBatch(batch); LOG.debug("created volume:{} user:{}", args.getVolume(), args.getOwnerName()); - } catch (IOException ex) { + } catch (RocksDBException | IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Volume creation failed for user:{} volume:{}", args.getOwnerName(), args.getVolume(), ex); } - throw ex; + if(ex instanceof RocksDBException) { + throw RocksDBStore.toIOException("Volume creation failed.", + (RocksDBException) ex); + } else { + throw (IOException) ex; + } } finally { metadataManager.writeLock().unlock(); } @@ -184,7 +195,7 @@ public void setOwner(String volume, String owner) throws IOException { metadataManager.writeLock().lock(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); + byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey); if (volInfo == null) { LOG.debug("Changing volume ownership failed for user:{} volume:{}", owner, volume); @@ -195,28 +206,34 @@ public void setOwner(String volume, String owner) throws IOException { OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo); Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - BatchOperation batch = new BatchOperation(); - delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); - addVolumeToOwnerList(volume, owner, batch); + try(WriteBatch batch = new WriteBatch()) { + delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); + addVolumeToOwnerList(volume, owner, batch); - OmVolumeArgs newVolumeArgs = - OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume()) - .setAdminName(volumeArgs.getAdminName()) - .setOwnerName(owner) - .setQuotaInBytes(volumeArgs.getQuotaInBytes()) - .setCreationTime(volumeArgs.getCreationTime()) - .build(); + OmVolumeArgs newVolumeArgs = + OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume()) + .setAdminName(volumeArgs.getAdminName()) + .setOwnerName(owner) + .setQuotaInBytes(volumeArgs.getQuotaInBytes()) + .setCreationTime(volumeArgs.getCreationTime()) + .build(); - VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); - batch.put(dbVolumeKey, newVolumeInfo.toByteArray()); - - metadataManager.writeBatch(batch); - } catch (IOException ex) { + VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); + batch.put(metadataManager.getVolumeTable().getHandle(), + dbVolumeKey, newVolumeInfo.toByteArray()); + metadataManager.getStore().write(batch); + } + } catch (RocksDBException | IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Changing volume ownership failed for user:{} volume:{}", owner, volume, ex); } - throw ex; + if(ex instanceof RocksDBException) { + throw RocksDBStore.toIOException("Volume creation failed.", + (RocksDBException) ex); + } else { + throw (IOException) ex; + } } finally { metadataManager.writeLock().unlock(); } @@ -234,7 +251,7 @@ public void setQuota(String volume, long quota) throws IOException { metadataManager.writeLock().lock(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); + byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey); if (volInfo == null) { LOG.debug("volume:{} does not exist", volume); throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); @@ -253,7 +270,8 @@ public void setQuota(String volume, long quota) throws IOException { .setCreationTime(volumeArgs.getCreationTime()).build(); VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); - metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray()); + metadataManager.getVolumeTable().put(dbVolumeKey, + newVolumeInfo.toByteArray()); } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Changing volume quota failed for volume:{} quota:{}", volume, @@ -276,7 +294,7 @@ public OmVolumeArgs getVolumeInfo(String volume) throws IOException { metadataManager.readLock().lock(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); + byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey); if (volInfo == null) { LOG.debug("volume:{} does not exist", volume); throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); @@ -307,9 +325,9 @@ public void deleteVolume(String volume) throws IOException { Preconditions.checkNotNull(volume); metadataManager.writeLock().lock(); try { - BatchOperation batch = new BatchOperation(); + byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); + byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey); if (volInfo == null) { LOG.debug("volume:{} does not exist", volume); throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); @@ -324,14 +342,22 @@ public void deleteVolume(String volume) throws IOException { Preconditions.checkState(volume.equals(volumeInfo.getVolume())); // delete the volume from the owner list // as well as delete the volume entry - delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch); - batch.delete(dbVolumeKey); - metadataManager.writeBatch(batch); - } catch (IOException ex) { + try(WriteBatch batch = new WriteBatch()) { + delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch); + batch.delete(metadataManager.getVolumeTable().getHandle(), + dbVolumeKey); + metadataManager.getStore().write(batch); + } + } catch (RocksDBException| IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Delete volume failed for volume:{}", volume, ex); } - throw ex; + if(ex instanceof RocksDBException) { + throw RocksDBStore.toIOException("Volume creation failed.", + (RocksDBException) ex); + } else { + throw (IOException) ex; + } } finally { metadataManager.writeLock().unlock(); } @@ -352,7 +378,7 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) metadataManager.readLock().lock(); try { byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); + byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey); if (volInfo == null) { LOG.debug("volume:{} does not exist", volume); throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); @@ -378,7 +404,7 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) */ @Override public List listVolumes(String userName, - String prefix, String startKey, int maxKeys) throws IOException { + String prefix, String startKey, int maxKeys) throws IOException { metadataManager.readLock().lock(); try { return metadataManager.listVolumes( diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 45ec2d0412..06d782b820 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -526,8 +526,7 @@ public CommitKeyResponse commitKey(RpcController controller, .setFactor(factor) .setDataSize(keyArgs.getDataSize()) .build(); - int id = request.getClientID(); - impl.commitKey(omKeyArgs, id); + impl.commitKey(omKeyArgs, request.getClientID()); resp.setStatus(Status.OK); } catch (IOException e) { resp.setStatus(exceptionToResponseStatus(e)); @@ -547,8 +546,8 @@ public AllocateBlockResponse allocateBlock(RpcController controller, .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) .build(); - int id = request.getClientID(); - OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs, id); + OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs, + request.getClientID()); resp.setKeyLocation(newLocation.getProtobuf()); resp.setStatus(Status.OK); } catch (IOException e) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java index 1ecac7fdac..9684a1f222 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java @@ -17,33 +17,26 @@ package org.apache.hadoop.ozone.om; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; -import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; +import java.io.File; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.LinkedList; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.mockito.Mockito.any; +import java.util.List; /** * Tests BucketManagerImpl, mocks OMMetadataManager for testing. @@ -53,86 +46,35 @@ public class TestBucketManagerImpl { @Rule public ExpectedException thrown = ExpectedException.none(); - private OMMetadataManager getMetadataManagerMock(String... volumesToCreate) - throws IOException { - OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class); - Map metadataDB = new HashMap<>(); - ReadWriteLock lock = new ReentrantReadWriteLock(); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); - Mockito.when(metadataManager.writeLock()).thenReturn(lock.writeLock()); - Mockito.when(metadataManager.readLock()).thenReturn(lock.readLock()); - Mockito.when(metadataManager.getVolumeKey(any(String.class))).thenAnswer( - (InvocationOnMock invocation) -> - DFSUtil.string2Bytes( - OzoneConsts.OM_VOLUME_PREFIX + invocation.getArguments()[0])); - Mockito.when(metadataManager - .getBucketKey(any(String.class), any(String.class))).thenAnswer( - (InvocationOnMock invocation) -> - DFSUtil.string2Bytes( - OzoneConsts.OM_VOLUME_PREFIX - + invocation.getArguments()[0] - + OzoneConsts.OM_BUCKET_PREFIX - + invocation.getArguments()[1])); - - Mockito.doAnswer( - new Answer() { - @Override - public Boolean answer(InvocationOnMock invocation) - throws Throwable { - String keyRootName = OzoneConsts.OM_KEY_PREFIX - + invocation.getArguments()[0] - + OzoneConsts.OM_KEY_PREFIX - + invocation.getArguments()[1] - + OzoneConsts.OM_KEY_PREFIX; - Iterator keyIterator = metadataDB.keySet().iterator(); - while(keyIterator.hasNext()) { - if(keyIterator.next().startsWith(keyRootName)) { - return false; - } - } - return true; - } - }).when(metadataManager).isBucketEmpty(any(String.class), - any(String.class)); - - Mockito.doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - metadataDB.put(DFSUtil.bytes2String( - (byte[])invocation.getArguments()[0]), - (byte[])invocation.getArguments()[1]); - return null; - } - }).when(metadataManager).put(any(byte[].class), any(byte[].class)); - - Mockito.when(metadataManager.get(any(byte[].class))).thenAnswer( - (InvocationOnMock invocation) -> - metadataDB.get(DFSUtil.bytes2String( - (byte[])invocation.getArguments()[0])) - ); - Mockito.doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - metadataDB.remove(DFSUtil.bytes2String( - (byte[])invocation.getArguments()[0])); - return null; - } - }).when(metadataManager).delete(any(byte[].class)); - - for(String volumeName : volumesToCreate) { - byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName); - metadataDB.put(OzoneConsts.OM_VOLUME_PREFIX + volumeName, - dummyVolumeInfo); + private OzoneConfiguration createNewTestPath() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + File newFolder = folder.newFolder(); + if (!newFolder.exists()) { + Assert.assertTrue(newFolder.mkdirs()); } - return metadataManager; + ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString()); + return conf; + } + + private OmMetadataManagerImpl createSampleVol() throws IOException { + OzoneConfiguration conf = createNewTestPath(); + OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf); + byte[] volumeKey = metaMgr.getVolumeKey("sampleVol"); + // This is a simple hack for testing, we just test if the volume via a + // null check, do not parse the value part. So just write some dummy value. + metaMgr.getVolumeTable().put(volumeKey, volumeKey); + return metaMgr; } @Test - public void testCreateBucketWithoutVolume() throws IOException { + public void testCreateBucketWithoutVolume() throws Exception { thrown.expectMessage("Volume doesn't exist"); - OMMetadataManager metaMgr = getMetadataManagerMock(); + OzoneConfiguration conf = createNewTestPath(); + OmMetadataManagerImpl metaMgr = + new OmMetadataManagerImpl(conf); try { BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() @@ -140,29 +82,35 @@ public void testCreateBucketWithoutVolume() throws IOException { .setBucketName("bucketOne") .build(); bucketManager.createBucket(bucketInfo); - } catch(OMException omEx) { + } catch (OMException omEx) { Assert.assertEquals(ResultCodes.FAILED_VOLUME_NOT_FOUND, omEx.getResult()); throw omEx; + } finally { + metaMgr.getStore().close(); } } @Test - public void testCreateBucket() throws IOException { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + public void testCreateBucket() throws Exception { + OmMetadataManagerImpl metaMgr = createSampleVol(); + BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() .setVolumeName("sampleVol") .setBucketName("bucketOne") .build(); bucketManager.createBucket(bucketInfo); - Assert.assertNotNull(bucketManager.getBucketInfo("sampleVol", "bucketOne")); + Assert.assertNotNull(bucketManager.getBucketInfo("sampleVol", + "bucketOne")); + metaMgr.getStore().close(); } @Test - public void testCreateAlreadyExistingBucket() throws IOException { + public void testCreateAlreadyExistingBucket() throws Exception { thrown.expectMessage("Bucket already exist"); - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + OmMetadataManagerImpl metaMgr = createSampleVol(); + try { BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() @@ -171,30 +119,37 @@ public void testCreateAlreadyExistingBucket() throws IOException { .build(); bucketManager.createBucket(bucketInfo); bucketManager.createBucket(bucketInfo); - } catch(OMException omEx) { + } catch (OMException omEx) { Assert.assertEquals(ResultCodes.FAILED_BUCKET_ALREADY_EXISTS, omEx.getResult()); throw omEx; + } finally { + metaMgr.getStore().close(); } } @Test - public void testGetBucketInfoForInvalidBucket() throws IOException { + public void testGetBucketInfoForInvalidBucket() throws Exception { thrown.expectMessage("Bucket not found"); + OmMetadataManagerImpl metaMgr = createSampleVol(); try { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + + BucketManager bucketManager = new BucketManagerImpl(metaMgr); bucketManager.getBucketInfo("sampleVol", "bucketOne"); - } catch(OMException omEx) { + } catch (OMException omEx) { Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_FOUND, omEx.getResult()); throw omEx; + } finally { + metaMgr.getStore().close(); } } @Test - public void testGetBucketInfo() throws IOException { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + public void testGetBucketInfo() throws Exception { + OmMetadataManagerImpl metaMgr = createSampleVol(); + BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() .setVolumeName("sampleVol") @@ -210,11 +165,13 @@ public void testGetBucketInfo() throws IOException { Assert.assertEquals(StorageType.DISK, result.getStorageType()); Assert.assertEquals(false, result.getIsVersionEnabled()); + metaMgr.getStore().close(); } @Test - public void testSetBucketPropertyAddACL() throws IOException { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + public void testSetBucketPropertyAddACL() throws Exception { + OmMetadataManagerImpl metaMgr = createSampleVol(); + List acls = new LinkedList<>(); OzoneAcl ozoneAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, "root", OzoneAcl.OzoneACLRights.READ); @@ -247,11 +204,13 @@ public void testSetBucketPropertyAddACL() throws IOException { "sampleVol", "bucketOne"); Assert.assertEquals(2, updatedResult.getAcls().size()); Assert.assertTrue(updatedResult.getAcls().contains(newAcl)); + metaMgr.getStore().close(); } @Test - public void testSetBucketPropertyRemoveACL() throws IOException { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + public void testSetBucketPropertyRemoveACL() throws Exception { + OmMetadataManagerImpl metaMgr = createSampleVol(); + List acls = new LinkedList<>(); OzoneAcl aclOne = new OzoneAcl(OzoneAcl.OzoneACLType.USER, "root", OzoneAcl.OzoneACLRights.READ); @@ -283,11 +242,13 @@ public void testSetBucketPropertyRemoveACL() throws IOException { "sampleVol", "bucketOne"); Assert.assertEquals(1, updatedResult.getAcls().size()); Assert.assertFalse(updatedResult.getAcls().contains(aclTwo)); + metaMgr.getStore().close(); } @Test - public void testSetBucketPropertyChangeStorageType() throws IOException { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + public void testSetBucketPropertyChangeStorageType() throws Exception { + OmMetadataManagerImpl metaMgr = createSampleVol(); + BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() .setVolumeName("sampleVol") @@ -309,11 +270,13 @@ public void testSetBucketPropertyChangeStorageType() throws IOException { "sampleVol", "bucketOne"); Assert.assertEquals(StorageType.SSD, updatedResult.getStorageType()); + metaMgr.getStore().close(); } @Test - public void testSetBucketPropertyChangeVersioning() throws IOException { - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + public void testSetBucketPropertyChangeVersioning() throws Exception { + OmMetadataManagerImpl metaMgr = createSampleVol(); + BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() .setVolumeName("sampleVol") @@ -333,21 +296,22 @@ public void testSetBucketPropertyChangeVersioning() throws IOException { OmBucketInfo updatedResult = bucketManager.getBucketInfo( "sampleVol", "bucketOne"); Assert.assertTrue(updatedResult.getIsVersionEnabled()); + metaMgr.getStore().close(); } @Test - public void testDeleteBucket() throws IOException { + public void testDeleteBucket() throws Exception { thrown.expectMessage("Bucket not found"); - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + OmMetadataManagerImpl metaMgr = createSampleVol(); BucketManager bucketManager = new BucketManagerImpl(metaMgr); - for(int i = 0; i < 5; i++) { + for (int i = 0; i < 5; i++) { OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() .setVolumeName("sampleVol") .setBucketName("bucket_" + i) .build(); bucketManager.createBucket(bucketInfo); } - for(int i = 0; i < 5; i++) { + for (int i = 0; i < 5; i++) { Assert.assertEquals("bucket_" + i, bucketManager.getBucketInfo( "sampleVol", "bucket_" + i).getBucketName()); @@ -356,22 +320,23 @@ public void testDeleteBucket() throws IOException { bucketManager.deleteBucket("sampleVol", "bucket_1"); Assert.assertNotNull(bucketManager.getBucketInfo( "sampleVol", "bucket_2")); - } catch(IOException ex) { + } catch (IOException ex) { Assert.fail(ex.getMessage()); } try { bucketManager.getBucketInfo("sampleVol", "bucket_1"); - } catch(OMException omEx) { + } catch (OMException omEx) { Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_FOUND, omEx.getResult()); throw omEx; } + metaMgr.getStore().close(); } @Test - public void testDeleteNonEmptyBucket() throws IOException { + public void testDeleteNonEmptyBucket() throws Exception { thrown.expectMessage("Bucket is not empty"); - OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol"); + OmMetadataManagerImpl metaMgr = createSampleVol(); BucketManager bucketManager = new BucketManagerImpl(metaMgr); OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() .setVolumeName("sampleVol") @@ -379,16 +344,19 @@ public void testDeleteNonEmptyBucket() throws IOException { .build(); bucketManager.createBucket(bucketInfo); //Create keys in bucket - metaMgr.put(DFSUtil.string2Bytes("/sampleVol/bucketOne/key_one"), + metaMgr.getKeyTable().put(DFSUtil.string2Bytes("/sampleVol/bucketOne" + + "/key_one"), DFSUtil.string2Bytes("value_one")); - metaMgr.put(DFSUtil.string2Bytes("/sampleVol/bucketOne/key_two"), + metaMgr.getKeyTable().put(DFSUtil.string2Bytes("/sampleVol/bucketOne" + + "/key_two"), DFSUtil.string2Bytes("value_two")); try { bucketManager.deleteBucket("sampleVol", "bucketOne"); - } catch(OMException omEx) { + } catch (OMException omEx) { Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_EMPTY, omEx.getResult()); throw omEx; } + metaMgr.getStore().close(); } } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java index 51018a1929..080840a422 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java @@ -57,9 +57,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_BUCKET_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_VOLUME_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB; /** @@ -412,12 +411,15 @@ private void insertOMDB(Connection conn, KeyType type, String keyName, } } + // TODO: This has to be fixed. + // we don't have prefix anymore. now each key is written into different + // table. The logic has to be changed. private KeyType getKeyType(String key) { if (key.startsWith(OM_USER_PREFIX)) { return KeyType.USER; - } else if (key.startsWith(OM_VOLUME_PREFIX)) { - return key.replaceFirst(OM_VOLUME_PREFIX, "") - .contains(OM_BUCKET_PREFIX) ? KeyType.BUCKET : KeyType.VOLUME; + } else if (key.startsWith(OM_KEY_PREFIX)) { + return key.replaceFirst(OM_KEY_PREFIX, "") + .contains(OM_KEY_PREFIX) ? KeyType.BUCKET : KeyType.VOLUME; }else { return KeyType.KEY; }