diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java
new file mode 100644
index 0000000000..1ef64d0c33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class that encapsulates Bucket Arguments.
+ */
+public final class KsmBucketArgs {
+ /**
+ * Name of the volume in which the bucket belongs to.
+ */
+ private final String volumeName;
+ /**
+ * Name of the bucket.
+ */
+ private final String bucketName;
+ /**
+ * ACL's that are to be added for the bucket.
+ */
+ private List addAcls;
+ /**
+ * ACL's that are to be removed from the bucket.
+ */
+ private List removeAcls;
+ /**
+ * Bucket Version flag.
+ */
+ private boolean isVersionEnabled;
+ /**
+ * Type of storage to be used for this bucket.
+ * [RAM_DISK, SSD, DISK, ARCHIVE]
+ */
+ private StorageType storageType;
+
+ /**
+ * Private constructor, constructed via builder.
+ * @param volumeName - Volume name.
+ * @param bucketName - Bucket name.
+ * @param addAcls - ACL's to be added.
+ * @param removeAcls - ACL's to be removed.
+ * @param isVersionEnabled - Bucket version flag.
+ * @param storageType - Storage type to be used.
+ */
+ private KsmBucketArgs(String volumeName, String bucketName,
+ List addAcls, List removeAcls,
+ boolean isVersionEnabled, StorageType storageType) {
+ this.volumeName = volumeName;
+ this.bucketName = bucketName;
+ this.addAcls = addAcls;
+ this.removeAcls = removeAcls;
+ this.isVersionEnabled = isVersionEnabled;
+ this.storageType = storageType;
+ }
+
+ /**
+ * Returns the Volume Name.
+ * @return String.
+ */
+ public String getVolumeName() {
+ return volumeName;
+ }
+
+ /**
+ * Returns the Bucket Name.
+ * @return String
+ */
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ /**
+ * Returns the ACL's that are to be added.
+ * @return List
+ */
+ public List getAddAcls() {
+ return addAcls;
+ }
+
+ /**
+ * Returns the ACL's that are to be removed.
+ * @return List
+ */
+ public List getRemoveAcls() {
+ return removeAcls;
+ }
+
+ /**
+ * Returns true if bucket version is enabled, else false.
+ * @return isVersionEnabled
+ */
+ public boolean getIsVersionEnabled() {
+ return isVersionEnabled;
+ }
+
+ /**
+ * Returns the type of storage to be used.
+ * @return StorageType
+ */
+ public StorageType getStorageType() {
+ return storageType;
+ }
+
+ /**
+ * Returns new builder class that builds a KsmBucketArgs.
+ *
+ * @return Builder
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for KsmBucketArgs.
+ */
+ public static class Builder {
+ private String volumeName;
+ private String bucketName;
+ private List addAcls;
+ private List removeAcls;
+ private boolean isVersionEnabled;
+ private StorageType storageType;
+
+ Builder() {
+ addAcls = new LinkedList<>();
+ removeAcls = new LinkedList<>();
+ }
+
+ public Builder setVolumeName(String volume) {
+ this.volumeName = volume;
+ return this;
+ }
+
+ public Builder setBucketName(String bucket) {
+ this.bucketName = bucket;
+ return this;
+ }
+
+ public Builder addAddAcl(OzoneAclInfo acl) {
+ this.addAcls.add(acl);
+ return this;
+ }
+
+ public Builder addRemoveAcl(OzoneAclInfo acl) {
+ this.removeAcls.add(acl);
+ return this;
+ }
+
+ public Builder setIsVersionEnabled(boolean versionFlag) {
+ this.isVersionEnabled = versionFlag;
+ return this;
+ }
+
+ public Builder setStorageType(StorageType storage) {
+ this.storageType = storage;
+ return this;
+ }
+
+ /**
+ * Constructs the KsmBucketArgs.
+ * @return instance of KsmBucketArgs.
+ */
+ public KsmBucketArgs build() {
+ Preconditions.checkNotNull(volumeName);
+ Preconditions.checkNotNull(bucketName);
+ Preconditions.checkNotNull(isVersionEnabled);
+ return new KsmBucketArgs(volumeName, bucketName, addAcls, removeAcls,
+ isVersionEnabled, storageType);
+ }
+ }
+
+ /**
+ * Creates BucketInfo protobuf from KsmBucketArgs.
+ */
+ public BucketInfo getProtobuf() {
+ return BucketInfo.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .addAllAddAcls(addAcls)
+ .addAllRemoveAcls(removeAcls)
+ .setIsVersionEnabled(isVersionEnabled)
+ .setStorageType(PBHelperClient.convertStorageType(storageType))
+ .build();
+ }
+
+ /**
+ * Parses BucketInfo protobuf and creates KsmBucketArgs.
+ * @param bucketInfo
+ * @return instance of KsmBucketArgs
+ */
+ public static KsmBucketArgs getFromProtobuf(BucketInfo bucketInfo) {
+ return new KsmBucketArgs(
+ bucketInfo.getVolumeName(),
+ bucketInfo.getBucketName(),
+ bucketInfo.getAddAclsList(),
+ bucketInfo.getRemoveAclsList(),
+ bucketInfo.getIsVersionEnabled(),
+ PBHelperClient.convertStorageType(
+ bucketInfo.getStorageType()));
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
similarity index 92%
rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
index 546b6c3a70..cd5d0c93d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ksm.protocol;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import java.io.IOException;
import java.util.List;
@@ -24,7 +25,7 @@
/**
* Protocol to talk to KSM.
*/
-public interface KeyspaceManagerProtocol {
+public interface KeySpaceManagerProtocol {
/**
* Creates a volume.
@@ -94,4 +95,12 @@ List listVolumeByUser(String userName, String prefix, String
*/
List listAllVolumes(String prefix, String
prevKey, long maxKeys) throws IOException;
+
+ /**
+ * Creates a bucket.
+ * @param args - Arguments to create Bucket.
+ * @throws IOException
+ */
+ void createBucket(KsmBucketArgs args) throws IOException;
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
index beb8b067ee..da13426b6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
@@ -22,8 +22,15 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
+import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.CreateBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
@@ -38,12 +45,12 @@
import java.util.List;
/**
- * The client side implementation of KeyspaceManagerProtocol.
+ * The client side implementation of KeySpaceManagerProtocol.
*/
@InterfaceAudience.Private
public final class KeySpaceManagerProtocolClientSideTranslatorPB
- implements KeyspaceManagerProtocol, ProtocolTranslator, Closeable {
+ implements KeySpaceManagerProtocol, ProtocolTranslator, Closeable {
/**
* RpcController is not used and hence is set to null.
@@ -199,6 +206,32 @@ public List listAllVolumes(String prefix, String prevKey, long
return null;
}
+ /**
+ * Creates a bucket.
+ *
+ * @param args - Arguments to create Bucket.
+ * @throws IOException
+ */
+ @Override
+ public void createBucket(KsmBucketArgs args) throws IOException {
+ CreateBucketRequest.Builder req =
+ CreateBucketRequest.newBuilder();
+ BucketInfo bucketInfo = args.getProtobuf();
+ req.setBucketInfo(bucketInfo);
+
+ final CreateBucketResponse resp;
+ try {
+ resp = rpcProxy.createBucket(NULL_RPC_CONTROLLER,
+ req.build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ if (resp.getStatus() != Status.OK) {
+ throw new IOException("Bucket creation failed, error: "
+ + resp.getStatus());
+ }
+ }
+
/**
* Return the proxy object underlying this protocol translator.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
index 1490008d67..8b960a949a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
@@ -19,15 +19,16 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyspaceManagerService;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.KeySpaceManagerService;
/**
* Protocol used to communicate with KSM.
*/
@ProtocolInfo(protocolName =
- "org.apache.hadoop.ozone.protocol.KeyspaceManagerProtocol",
+ "org.apache.hadoop.ozone.protocol.KeySpaceManagerProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface KeySpaceManagerProtocolPB
- extends KeyspaceManagerService.BlockingInterface {
+ extends KeySpaceManagerService.BlockingInterface {
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index 4ce7275a5e..ca3d0222b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -34,6 +34,7 @@ Ozone key space manager. Ozone KSM manages the namespace for ozone.
This is similar to Namenode for Ozone.
*/
+import "hdfs.proto";
import "Ozone.proto";
enum Status {
@@ -44,8 +45,11 @@ enum Status {
VOLUME_ALREADY_EXISTS = 5;
USER_NOT_FOUND = 6;
USER_TOO_MANY_VOLUMES = 7;
- ACCESS_DENIED = 8;
- INTERNAL_ERROR = 9;
+ BUCKET_NOT_FOUND = 8;
+ BUCKET_NOT_EMPTY = 9;
+ BUCKET_ALREADY_EXISTS = 10;
+ ACCESS_DENIED = 11;
+ INTERNAL_ERROR = 12;
}
@@ -154,10 +158,42 @@ message ListVolumeResponse {
repeated VolumeInfo volumeInfo = 2;
}
+message BucketInfo {
+ required string volumeName = 1;
+ required string bucketName = 2;
+ repeated OzoneAclInfo addAcls = 3;
+ repeated OzoneAclInfo removeAcls = 4;
+ required bool isVersionEnabled = 5 [default = false];
+ optional StorageTypeProto storageType = 6 [default = DISK];
+}
+
+message OzoneAclInfo {
+ enum OzoneAclType {
+ USER = 1;
+ GROUP = 2;
+ WORLD = 3;
+ }
+ enum OzoneAclRights {
+ READ = 1;
+ WRITE = 2;
+ READ_WRITE = 3;
+ }
+ required OzoneAclType type = 1;
+ required string name = 2;
+ required OzoneAclRights rights = 3;
+}
+
+message CreateBucketRequest {
+ required BucketInfo bucketInfo = 1;
+}
+
+message CreateBucketResponse {
+ required Status status = 1;
+}
/**
The KSM service that takes care of Ozone namespace.
*/
-service KeyspaceManagerService {
+service KeySpaceManagerService {
/**
Creates a Volume.
@@ -193,4 +229,10 @@ service KeyspaceManagerService {
*/
rpc listVolumes(ListVolumeRequest)
returns (ListVolumeResponse);
+
+ /**
+ Creates a Bucket.
+ */
+ rpc createBucket(CreateBucketRequest)
+ returns(CreateBucketResponse);
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 5f960b2171..2ecd67d3d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -93,6 +93,11 @@ public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
public static final String OZONE_HANDLER_LOCAL = "local";
+ /**
+ * Ozone metadata key delimiter.
+ */
+ public static final String DB_KEY_DELIMITER = "/";
+
private OzoneConsts() {
// Never Constructed
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
new file mode 100644
index 0000000000..b9dd551d7d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+
+/**
+ * BucketManager handles all the bucket level operations.
+ */
+public interface BucketManager {
+ /**
+ * Creates a bucket.
+ * @param args - KsmBucketArgs for creating bucket.
+ */
+ void createBucket(KsmBucketArgs args) throws KSMException;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
new file mode 100644
index 0000000000..b0368ea0f1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.DB_KEY_DELIMITER;
+
+/**
+ * KSM bucket manager.
+ */
+public class BucketManagerImpl implements BucketManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BucketManagerImpl.class);
+
+ /**
+ * MetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
+ */
+ private final MetadataManager metadataManager;
+
+ /**
+ * Constructs BucketManager.
+ * @param metadataManager
+ */
+ public BucketManagerImpl(MetadataManager metadataManager){
+ this.metadataManager = metadataManager;
+ }
+
+ /**
+ * MetadataDB is maintained in MetadataManager and shared between
+ * BucketManager and VolumeManager. (and also by KeyManager)
+ *
+ * BucketManager uses MetadataDB to store bucket level information.
+ *
+ * Keys used in BucketManager for storing data into MetadataDB
+ * for BucketInfo:
+ * {volume/bucket} -> bucketInfo
+ *
+ * Work flow of create bucket:
+ *
+ * -> Check if the Volume exists in metadataDB, if not throw
+ * VolumeNotFoundException.
+ * -> Else check if the Bucket exists in metadataDB, if so throw
+ * BucketExistException
+ * -> Else update MetadataDB with VolumeInfo.
+ */
+
+ /**
+ * Creates a bucket.
+ * @param args - KsmBucketArgs.
+ */
+ @Override
+ public void createBucket(KsmBucketArgs args) throws KSMException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeNameString = args.getVolumeName();
+ String bucketNameString = args.getBucketName();
+ try {
+ //bucket key: {volume/bucket}
+ String bucketKeyString = volumeNameString +
+ DB_KEY_DELIMITER + bucketNameString;
+
+ byte[] volumeName = DFSUtil.string2Bytes(volumeNameString);
+ byte[] bucketKey = DFSUtil.string2Bytes(bucketKeyString);
+
+ //Check if the volume exists
+ if(metadataManager.get(volumeName) == null) {
+ LOG.error("volume: {} not found ", volumeNameString);
+ throw new KSMException("Volume doesn't exist",
+ KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+ //Check if bucket already exists
+ if(metadataManager.get(bucketKey) != null) {
+ LOG.error("bucket: {} already exists ", bucketNameString);
+ throw new KSMException("Bucket already exist",
+ KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
+ }
+ metadataManager.put(bucketKey, args.getProtobuf().toByteArray());
+
+ LOG.info("created bucket: {} in volume: {}", bucketNameString,
+ volumeNameString);
+ } catch (DBException ex) {
+ LOG.error("Bucket creation failed for bucket:{} in volume:{}",
+ volumeNameString, bucketNameString, ex);
+ throw new KSMException(ex.getMessage(),
+ KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
index c75c8fcf01..359ddba837 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
@@ -29,9 +29,11 @@
public class KSMMetrics {
// KSM op metrics
private @Metric MutableCounterLong numVolumeCreates;
+ private @Metric MutableCounterLong numBucketCreates;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
+ private @Metric MutableCounterLong numBucketCreateFails;
public KSMMetrics() {
}
@@ -47,17 +49,36 @@ public void incNumVolumeCreates() {
numVolumeCreates.incr();
}
+ public void incNumBucketCreates() {
+ numBucketCreates.incr();
+ }
+
public void incNumVolumeCreateFails() {
numVolumeCreates.incr();
}
+ public void incNumBucketCreateFails() {
+ numBucketCreateFails.incr();
+ }
+
@VisibleForTesting
public long getNumVolumeCreates() {
return numVolumeCreates.value();
}
+ @VisibleForTesting
+ public long getNumBucketCreates() {
+ return numBucketCreates.value();
+ }
+
@VisibleForTesting
public long getNumVolumeCreateFails() {
return numVolumeCreateFails.value();
}
+
+ @VisibleForTesting
+ public long getNumBucketCreateFails() {
+ return numBucketCreateFails.value();
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 2ffeee7683..2578a9aced 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -23,13 +23,14 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
+import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocolPB
- .KeyspaceManagerProtocolServerSideTranslatorPB;
+ .KeySpaceManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -46,7 +47,7 @@
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.ozone.protocol.proto
- .KeySpaceManagerProtocolProtos.KeyspaceManagerService
+ .KeySpaceManagerProtocolProtos.KeySpaceManagerService
.newReflectiveBlockingService;
import static org.apache.hadoop.util.ExitUtil.terminate;
@@ -54,13 +55,15 @@
* Ozone Keyspace manager is the metadata manager of ozone.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
-public class KeySpaceManager implements KeyspaceManagerProtocol {
+public class KeySpaceManager implements KeySpaceManagerProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(KeySpaceManager.class);
private final RPC.Server ksmRpcServer;
private final InetSocketAddress ksmRpcAddress;
+ private final MetadataManager metadataManager;
private final VolumeManager volumeManager;
+ private final BucketManager bucketManager;
private final KSMMetrics metrics;
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
@@ -71,7 +74,7 @@ public KeySpaceManager(OzoneConfiguration conf) throws IOException {
ProtobufRpcEngine.class);
BlockingService ksmService = newReflectiveBlockingService(
- new KeyspaceManagerProtocolServerSideTranslatorPB(this));
+ new KeySpaceManagerProtocolServerSideTranslatorPB(this));
final InetSocketAddress ksmNodeRpcAddr = OzoneClientUtils.
getKsmAddress(conf);
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
@@ -79,7 +82,9 @@ public KeySpaceManager(OzoneConfiguration conf) throws IOException {
handlerCount);
ksmRpcAddress = updateListenAddress(conf,
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
- volumeManager = new VolumeManagerImpl(this, conf);
+ metadataManager = new MetadataManagerImpl(conf);
+ volumeManager = new VolumeManagerImpl(metadataManager, conf);
+ bucketManager = new BucketManagerImpl(metadataManager);
metrics = KSMMetrics.create();
}
@@ -185,7 +190,7 @@ private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
public void start() {
LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
ksmRpcAddress));
- volumeManager.start();
+ metadataManager.start();
ksmRpcServer.start();
}
@@ -195,7 +200,7 @@ public void start() {
public void stop() {
try {
ksmRpcServer.stop();
- volumeManager.stop();
+ metadataManager.stop();
} catch (IOException e) {
LOG.info("Key Space Manager stop failed.", e);
}
@@ -221,8 +226,13 @@ public void join() {
*/
@Override
public void createVolume(KsmVolumeArgs args) throws IOException {
- metrics.incNumVolumeCreates();
- volumeManager.createVolume(args);
+ try {
+ metrics.incNumVolumeCreates();
+ volumeManager.createVolume(args);
+ } catch (Exception ex) {
+ metrics.incNumVolumeCreateFails();
+ throw ex;
+ }
}
/**
@@ -317,4 +327,21 @@ public List listAllVolumes(String prefix, String prevKey, long
maxKeys) throws IOException {
return null;
}
+
+ /**
+ * Creates a bucket.
+ *
+ * @param args - Arguments to create Bucket.
+ * @throws IOException
+ */
+ @Override
+ public void createBucket(KsmBucketArgs args) throws IOException {
+ try {
+ metrics.incNumBucketCreates();
+ bucketManager.createBucket(args);
+ } catch (Exception ex) {
+ metrics.incNumBucketCreateFails();
+ throw ex;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
new file mode 100644
index 0000000000..71269b2a21
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * KSM metadata manager interface.
+ */
+public interface MetadataManager {
+ /**
+ * Start metadata manager.
+ */
+ void start();
+
+ /**
+ * Stop metadata manager.
+ */
+ void stop() throws IOException;
+
+ /**
+ * Returns the read lock used on Metadata DB.
+ * @return readLock
+ */
+ Lock readLock();
+
+ /**
+ * Returns the write lock used on Metadata DB.
+ * @return writeLock
+ */
+ Lock writeLock();
+
+ /**
+ * Returns the value associated with this key.
+ * @param key - key
+ * @return value
+ */
+ byte[] get(byte[] key);
+
+ /**
+ * Puts a Key into Metadata DB.
+ * @param key - key
+ * @param value - value
+ */
+ void put(byte[] key, byte[] value);
+
+ /**
+ * Performs a batch Put to Metadata DB.
+ * Can be used to do multiple puts atomically.
+ * @param list - list of Map.Entry
+ */
+ void batchPut(List> list) throws IOException;
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
new file mode 100644
index 0000000000..f4b0440684
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+ .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+ .OZONE_KSM_DB_CACHE_SIZE_MB;
+
+/**
+ * KSM metadata manager interface.
+ */
+public class MetadataManagerImpl implements MetadataManager {
+
+ private final LevelDBStore store;
+ private final ReadWriteLock lock;
+
+
+ public MetadataManagerImpl(OzoneConfiguration conf) throws IOException {
+ File metaDir = OzoneUtils.getScmMetadirPath(conf);
+ final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
+ OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
+ Options options = new Options();
+ options.cacheSize(cacheSize * OzoneConsts.MB);
+ File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
+ this.store = new LevelDBStore(ksmDBFile, options);
+ this.lock = new ReentrantReadWriteLock();
+ }
+
+ /**
+ * Start metadata manager.
+ */
+ @Override
+ public void start() {
+
+ }
+
+ /**
+ * Stop metadata manager.
+ */
+ @Override
+ public void stop() throws IOException {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ /**
+ * Returns the read lock used on Metadata DB.
+ * @return readLock
+ */
+ @Override
+ public Lock readLock() {
+ return lock.readLock();
+ }
+
+ /**
+ * Returns the write lock used on Metadata DB.
+ * @return writeLock
+ */
+ @Override
+ public Lock writeLock() {
+ return lock.writeLock();
+ }
+
+ /**
+ * Returns the value associated with this key.
+ * @param key - key
+ * @return value
+ */
+ @Override
+ public byte[] get(byte[] key) {
+ return store.get(key);
+ }
+
+ /**
+ * Puts a Key into Metadata DB.
+ * @param key - key
+ * @param value - value
+ */
+ @Override
+ public void put(byte[] key, byte[] value) {
+ store.put(key, value);
+ }
+
+ /**
+ * Performs a batch Put to Metadata DB.
+ * Can be used to do multiple puts atomically.
+ * @param list - list of Map.Entry
+ */
+ @Override
+ public void batchPut(List> list)
+ throws IOException {
+ WriteBatch batch = store.createWriteBatch();
+ list.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
+ try {
+ store.commitWriteBatch(batch);
+ } finally {
+ store.closeWriteBatch(batch);
+ }
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
index e5bb4bd82e..6c2f0a316f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
@@ -24,15 +24,6 @@
* KSM volume manager interface.
*/
public interface VolumeManager {
- /**
- * Start volume manager.
- */
- void start();
-
- /**
- * Stop volume manager.
- */
- void stop() throws IOException;
/**
* Create a new volume.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index 1e63127a38..ff0d087797 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -20,32 +20,21 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
+import java.util.AbstractMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.Map;
-import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
-import static org.apache.hadoop.ozone.ksm
- .KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.ksm
- .KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.ksm
@@ -60,9 +49,7 @@ public class VolumeManagerImpl implements VolumeManager {
private static final Logger LOG =
LoggerFactory.getLogger(VolumeManagerImpl.class);
- private final KeySpaceManager ksm;
- private final LevelDBStore store;
- private final ReadWriteLock lock;
+ private final MetadataManager metadataManager;
private final int maxUserVolumeCount;
/**
@@ -70,30 +57,13 @@ public class VolumeManagerImpl implements VolumeManager {
* @param conf - Ozone configuration.
* @throws IOException
*/
- public VolumeManagerImpl(KeySpaceManager ksm, OzoneConfiguration conf)
- throws IOException {
- File metaDir = OzoneUtils.getScmMetadirPath(conf);
- final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
- OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
- Options options = new Options();
- options.cacheSize(cacheSize * OzoneConsts.MB);
- File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
- this.ksm = ksm;
- this.store = new LevelDBStore(ksmDBFile, options);
- lock = new ReentrantReadWriteLock();
+ public VolumeManagerImpl(MetadataManager metadataManager,
+ OzoneConfiguration conf) throws IOException {
+ this.metadataManager = metadataManager;
this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
}
- @Override
- public void start() {
- }
-
- @Override
- public void stop() throws IOException {
- store.close();
- }
-
/**
* Creates a volume.
* @param args - KsmVolumeArgs.
@@ -101,10 +71,11 @@ public void stop() throws IOException {
@Override
public void createVolume(KsmVolumeArgs args) throws IOException {
Preconditions.checkNotNull(args);
- lock.writeLock().lock();
- WriteBatch batch = store.createWriteBatch();
+ metadataManager.writeLock().lock();
+ List> batch = new LinkedList<>();
try {
- byte[] volumeName = store.get(DFSUtil.string2Bytes(args.getVolume()));
+ byte[] volumeName = metadataManager.
+ get(DFSUtil.string2Bytes(args.getVolume()));
// Check of the volume already exists
if(volumeName != null) {
@@ -114,7 +85,8 @@ public void createVolume(KsmVolumeArgs args) throws IOException {
// Next count the number of volumes for the user
String dbUserName = "$" + args.getOwnerName();
- byte[] volumeList = store.get(DFSUtil.string2Bytes(dbUserName));
+ byte[] volumeList = metadataManager
+ .get(DFSUtil.string2Bytes(dbUserName));
List prevVolList;
if (volumeList != null) {
VolumeList vlist = VolumeList.parseFrom(volumeList);
@@ -128,26 +100,25 @@ public void createVolume(KsmVolumeArgs args) throws IOException {
throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
}
- // Commit the volume information to leveldb
+ // Commit the volume information to metadataManager
VolumeInfo volumeInfo = args.getProtobuf();
- batch.put(DFSUtil.string2Bytes(args.getVolume()),
- volumeInfo.toByteArray());
+ batch.add(new AbstractMap.SimpleEntry<>(
+ DFSUtil.string2Bytes(args.getVolume()), volumeInfo.toByteArray()));
prevVolList.add(args.getVolume());
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
- batch.put(DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray());
- store.commitWriteBatch(batch);
+ batch.add(new AbstractMap.SimpleEntry<>(
+ DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray()));
+ metadataManager.batchPut(batch);
LOG.info("created volume:{} user:{}",
args.getVolume(), args.getOwnerName());
} catch (IOException | DBException ex) {
- ksm.getMetrics().incNumVolumeCreateFails();
LOG.error("Volume creation failed for user:{} volname:{}",
args.getOwnerName(), args.getVolume(), ex);
throw ex;
} finally {
- store.closeWriteBatch(batch);
- lock.writeLock().unlock();
+ metadataManager.writeLock().unlock();
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
index 1a1b3a941a..e1b90c37b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
@@ -98,6 +98,8 @@ public KSMException.ResultCodes getResult() {
public enum ResultCodes {
FAILED_TOO_MANY_USER_VOLUMES,
FAILED_VOLUME_ALREADY_EXISTS,
+ FAILED_VOLUME_NOT_FOUND,
+ FAILED_BUCKET_ALREADY_EXISTS,
FAILED_INTERNAL_ERROR
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java
new file mode 100644
index 0000000000..3797bfa1b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.ozone.web.request.OzoneAcl;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
+
+/**
+ * Utilities for converting protobuf classes.
+ */
+public final class KSMPBHelper {
+
+ private KSMPBHelper() {
+ /** Hidden constructor */
+ }
+
+ /**
+ * Returns protobuf's OzoneAclInfo of the current instance.
+ * @return OzoneAclInfo
+ */
+ public static OzoneAclInfo convertOzoneAcl(OzoneAcl acl) {
+ OzoneAclInfo.OzoneAclType aclType;
+ switch(acl.getType()) {
+ case USER:
+ aclType = OzoneAclType.USER;
+ break;
+ case GROUP:
+ aclType = OzoneAclType.GROUP;
+ break;
+ case WORLD:
+ aclType = OzoneAclType.WORLD;
+ break;
+ default:
+ throw new IllegalArgumentException("ACL type is not recognized");
+ }
+ OzoneAclInfo.OzoneAclRights aclRights;
+ switch(acl.getRights()) {
+ case READ:
+ aclRights = OzoneAclRights.READ;
+ break;
+ case WRITE:
+ aclRights = OzoneAclRights.WRITE;
+ break;
+ case READ_WRITE:
+ aclRights = OzoneAclRights.READ_WRITE;
+ break;
+ default:
+ throw new IllegalArgumentException("ACL right is not recognized");
+ }
+
+ return OzoneAclInfo.newBuilder().setType(aclType)
+ .setName(acl.getName())
+ .setRights(aclRights)
+ .build();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
similarity index 77%
rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
index aa52c17b37..85eca042f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
@@ -18,11 +18,16 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
+import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.CreateBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+ .KeySpaceManagerProtocolProtos.CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
@@ -56,19 +61,19 @@
/**
* This class is the server-side translator that forwards requests received on
* {@link org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB} to the
- * KeyspaceManagerService server implementation.
+ * KeySpaceManagerService server implementation.
*/
-public class KeyspaceManagerProtocolServerSideTranslatorPB implements
+public class KeySpaceManagerProtocolServerSideTranslatorPB implements
KeySpaceManagerProtocolPB {
- private final KeyspaceManagerProtocol impl;
+ private final KeySpaceManagerProtocol impl;
/**
* Constructs an instance of the server handler.
*
* @param impl KeySpaceManagerProtocolPB
*/
- public KeyspaceManagerProtocolServerSideTranslatorPB(
- KeyspaceManagerProtocol impl) {
+ public KeySpaceManagerProtocolServerSideTranslatorPB(
+ KeySpaceManagerProtocol impl) {
this.impl = impl;
}
@@ -131,4 +136,28 @@ public ListVolumeResponse listVolumes(
throws ServiceException {
return null;
}
+
+ @Override
+ public CreateBucketResponse createBucket(
+ RpcController controller, CreateBucketRequest
+ request) throws ServiceException {
+ CreateBucketResponse.Builder resp =
+ CreateBucketResponse.newBuilder();
+ try {
+ impl.createBucket(KsmBucketArgs.getFromProtobuf(
+ request.getBucketInfo()));
+ resp.setStatus(Status.OK);
+ } catch (KSMException ksmEx) {
+ if (ksmEx.getResult() ==
+ ResultCodes.FAILED_VOLUME_NOT_FOUND) {
+ resp.setStatus(Status.VOLUME_NOT_FOUND);
+ } else if (ksmEx.getResult() ==
+ ResultCodes.FAILED_BUCKET_ALREADY_EXISTS) {
+ resp.setStatus(Status.BUCKET_ALREADY_EXISTS);
+ }
+ } catch(IOException ex) {
+ resp.setStatus(Status.INTERNAL_ERROR);
+ }
+ return resp.build();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index e96d3d10cd..7a9bd4e9da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -18,20 +18,29 @@
package org.apache.hadoop.ozone.web.storage;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+ .ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+ .ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+ .ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset
+ .LengthInputStream;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ksm.protocolPB
+ .KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.protocol.LocatedContainer;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+ .StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@@ -59,7 +68,6 @@
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
/**
* A {@link StorageHandler} implementation that distributes object storage
@@ -167,22 +175,38 @@ public VolumeInfo getVolumeInfo(VolumeArgs args)
@Override
public void createBucket(final BucketArgs args)
throws IOException, OzoneException {
- String containerKey = buildContainerKey(args.getVolumeName(),
- args.getBucketName());
- XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
- try {
- BucketInfo bucket = new BucketInfo();
- bucket.setVolumeName(args.getVolumeName());
- bucket.setBucketName(args.getBucketName());
- bucket.setAcls(args.getAddAcls());
- bucket.setVersioning(args.getVersioning());
- bucket.setStorageType(args.getStorageType());
- KeyData containerKeyData = fromBucketToContainerKeyData(
- xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
- putKey(xceiverClient, containerKeyData, args.getRequestID());
- } finally {
- xceiverClientManager.releaseClient(xceiverClient);
+ KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+ args.getAddAcls().forEach(acl ->
+ builder.addAddAcl(KSMPBHelper.convertOzoneAcl(acl)));
+ args.getRemoveAcls().forEach(acl ->
+ builder.addRemoveAcl(KSMPBHelper.convertOzoneAcl(acl)));
+ builder.setVolumeName(args.getVolumeName())
+ .setBucketName(args.getBucketName())
+ .setIsVersionEnabled(getBucketVersioningProtobuf(
+ args.getVersioning()))
+ .setStorageType(args.getStorageType());
+ keySpaceManagerClient.createBucket(builder.build());
+ }
+
+ /**
+ * Converts OzoneConts.Versioning enum to boolean.
+ *
+ * @param version
+ * @return corresponding boolean value
+ */
+ private boolean getBucketVersioningProtobuf(
+ Versioning version) {
+ if(version != null) {
+ switch(version) {
+ case ENABLED:
+ return true;
+ case NOT_DEFINED:
+ case DISABLED:
+ default:
+ return false;
+ }
}
+ return false;
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
new file mode 100644
index 0000000000..5bd9ec67d6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.ksm.exceptions
+ .KSMException.ResultCodes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.mockito.Mockito.any;
+
+/**
+ * Tests BucketManagerImpl, mocks MetadataManager for testing.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestBucketManagerImpl {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private MetadataManager getMetadataManagerMock(String... volumesToCreate)
+ throws IOException {
+ MetadataManager metadataManager = Mockito.mock(MetadataManager.class);
+ Map metadataDB = new HashMap<>();
+ ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ Mockito.when(metadataManager.writeLock()).thenReturn(lock.writeLock());
+ Mockito.when(metadataManager.readLock()).thenReturn(lock.readLock());
+ Mockito.doAnswer(
+ new Answer() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ metadataDB.put(DFSUtil.bytes2String(
+ (byte[])invocation.getArguments()[0]),
+ (byte[])invocation.getArguments()[1]);
+ return null;
+ }
+ }).when(metadataManager).put(any(byte[].class), any(byte[].class));
+
+ Mockito.when(metadataManager.get(any(byte[].class))).thenAnswer(
+ (InvocationOnMock invocation) ->
+ metadataDB.get(DFSUtil.bytes2String(
+ (byte[])invocation.getArguments()[0]))
+ );
+ for(String volumeName : volumesToCreate) {
+ byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName);
+ metadataDB.put(volumeName, dummyVolumeInfo);
+ }
+ return metadataManager;
+ }
+
+ @Test
+ public void testCreateBucketWithoutVolume() throws IOException {
+ thrown.expectMessage("Volume doesn't exist");
+ MetadataManager metaMgr = getMetadataManagerMock();
+ try {
+ BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+ KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+ .setVolumeName("sampleVol")
+ .setBucketName("bucketOne")
+ .setStorageType(StorageType.DISK)
+ .setIsVersionEnabled(false)
+ .build();
+ bucketManager.createBucket(bucketArgs);
+ } catch(KSMException ksmEx) {
+ Assert.assertEquals(ResultCodes.FAILED_VOLUME_NOT_FOUND,
+ ksmEx.getResult());
+ throw ksmEx;
+ }
+ }
+
+ @Test
+ public void testCreateBucket() throws IOException {
+ MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+ BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+ KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+ .setVolumeName("sampleVol")
+ .setBucketName("bucketOne")
+ .setStorageType(StorageType.DISK)
+ .setIsVersionEnabled(false)
+ .build();
+ bucketManager.createBucket(bucketArgs);
+ //TODO: Use BucketManagerImpl#getBucketInfo to verify creation of bucket.
+ Assert.assertNotNull(metaMgr
+ .get(DFSUtil.string2Bytes("sampleVol/bucketOne")));
+ }
+
+ @Test
+ public void testCreateAlreadyExistingBucket() throws IOException {
+ thrown.expectMessage("Bucket already exist");
+ MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+ try {
+ BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+ KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+ .setVolumeName("sampleVol")
+ .setBucketName("bucketOne")
+ .setStorageType(StorageType.DISK)
+ .setIsVersionEnabled(false)
+ .build();
+ bucketManager.createBucket(bucketArgs);
+ bucketManager.createBucket(bucketArgs);
+ } catch(KSMException ksmEx) {
+ Assert.assertEquals(ResultCodes.FAILED_BUCKET_ALREADY_EXISTS,
+ ksmEx.getResult());
+ throw ksmEx;
+ }
+ }
+}
\ No newline at end of file