From 17a87080399e10386d3758b62e80ec91db7142f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 10 Dec 2018 10:05:38 +0100 Subject: [PATCH] HDDS-879. MultipartUpload: Add InitiateMultipartUpload in ozone. Contributed by Bharat Viswanadham. --- .../org/apache/hadoop/utils/db/DBStore.java | 2 + .../hadoop/ozone/client/OzoneBucket.java | 30 ++++++ .../ozone/client/protocol/ClientProtocol.java | 17 ++++ .../hadoop/ozone/client/rest/RestClient.java | 12 +++ .../hadoop/ozone/client/rpc/RpcClient.java | 23 +++++ .../apache/hadoop/ozone/audit/OMAction.java | 3 +- .../hadoop/ozone/om/OMMetadataManager.java | 22 +++++ .../om/codec/OmMultipartKeyInfoCodec.java | 56 +++++++++++ .../ozone/om/helpers/OmMultipartInfo.java | 77 +++++++++++++++ .../ozone/om/helpers/OmMultipartKeyInfo.java | 93 +++++++++++++++++++ .../om/protocol/OzoneManagerProtocol.java | 9 ++ ...ManagerProtocolClientSideTranslatorPB.java | 35 +++++++ .../src/main/proto/OzoneManagerProtocol.proto | 29 ++++++ .../om/codec/TestOmMultipartKeyInfoCodec.java | 54 +++++++++++ .../hadoop/ozone/om/codec/package-info.java | 24 +++++ .../ozone/client/rpc/TestOzoneRpcClient.java | 70 ++++++++++++++ .../apache/hadoop/ozone/om/TestOmMetrics.java | 18 +++- .../apache/hadoop/ozone/om/KeyManager.java | 10 ++ .../hadoop/ozone/om/KeyManagerImpl.java | 78 ++++++++++++++++ .../org/apache/hadoop/ozone/om/OMMetrics.java | 19 ++++ .../ozone/om/OmMetadataManagerImpl.java | 24 +++++ .../apache/hadoop/ozone/om/OzoneManager.java | 21 +++++ .../ozone/om/exceptions/OMException.java | 3 +- ...ManagerProtocolServerSideTranslatorPB.java | 33 +++++++ 24 files changed, 758 insertions(+), 4 deletions(-) create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java create mode 100644 hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java create mode 100644 hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/package-info.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index 23498bd22f..3965b9d7b6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -47,6 +47,8 @@ public interface DBStore extends AutoCloseable { * Gets an existing TableStore with implicit key/value conversion. * * @param name - Name of the TableStore to get + * @param keyType + * @param valueType * @return - TableStore. * @throws IOException on Failure */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index 8cc65eb924..cc8ddcdfab 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import java.io.IOException; import java.util.Iterator; @@ -323,6 +324,35 @@ public void renameKey(String fromKeyName, String toKeyName) proxy.renameKey(volumeName, name, fromKeyName, toKeyName); } + /** + * Initiate multipart upload for a specified key. + * @param keyName + * @param type + * @param factor + * @return OmMultipartInfo + * @throws IOException + */ + public OmMultipartInfo initiateMultipartUpload(String keyName, + ReplicationType type, + ReplicationFactor factor) + throws IOException { + return proxy.initiateMultipartUpload(volumeName, name, keyName, type, + factor); + } + + /** + * Initiate multipart upload for a specified key, with default replication + * type RATIS and with replication factor THREE. + * @param key Name of the key to be created. + * @return OmMultipartInfo. + * @throws IOException + */ + public OmMultipartInfo initiateMultipartUpload(String key) + throws IOException { + return initiateMultipartUpload(key, defaultReplicationType, + defaultReplication); + } + /** * An Iterator to iterate over {@link OzoneKey} list. */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index be5670db18..8afbd9876f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import java.io.IOException; import java.util.List; @@ -385,4 +386,20 @@ List listS3Buckets(String userName, String bucketPrefix, */ void close() throws IOException; + + /** + * Initiate Multipart upload. + * @param volumeName + * @param bucketName + * @param keyName + * @param type + * @param factor + * @return {@link OmMultipartInfo} + * @throws IOException + */ + OmMultipartInfo initiateMultipartUpload(String volumeName, String + bucketName, String keyName, ReplicationType type, ReplicationFactor + factor) throws IOException; + + } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java index 8889d999de..05f45c53eb 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java @@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails; import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort; import org.apache.hadoop.ozone.web.response.ListBuckets; @@ -951,4 +952,15 @@ private void addQueryParamter(String param, String value, builder.addParameter(param, value); } } + + @Override + public OmMultipartInfo initiateMultipartUpload(String volumeName, + String bucketName, + String keyName, + ReplicationType type, + ReplicationFactor factor) + throws IOException { + throw new UnsupportedOperationException("Ozone REST protocol does not " + + "support this operation."); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 334f713ff7..7aa359689e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -678,4 +679,26 @@ public void close() throws IOException { IOUtils.cleanupWithLogger(LOG, ozoneManagerClient); IOUtils.cleanupWithLogger(LOG, xceiverClientManager); } + + @Override + public OmMultipartInfo initiateMultipartUpload(String volumeName, + String bucketName, + String keyName, + ReplicationType type, + ReplicationFactor factor) + throws IOException { + HddsClientUtils.verifyResourceName(volumeName, bucketName); + HddsClientUtils.checkNotNull(keyName, type, factor); + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setType(HddsProtos.ReplicationType.valueOf(type.toString())) + .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) + .build(); + OmMultipartInfo multipartInfo = ozoneManagerClient + .initiateMultipartUpload(keyArgs); + return multipartInfo; + } + } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java index 879401415d..b08a530b2a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java @@ -46,7 +46,8 @@ public enum OMAction implements AuditAction { READ_VOLUME, READ_BUCKET, READ_KEY, - LIST_S3BUCKETS; + LIST_S3BUCKETS, + INITIATE_MULTIPART_UPLOAD; @Override public String getAction() { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index 0374056c8d..94ff0dbb6a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; import org.apache.hadoop.utils.db.DBStore; @@ -251,6 +252,27 @@ List listVolumes(String userName, String prefix, */ Table getS3Table(); + + /** + * Returns the DB key name of a multipart upload key in OM metadata store. + * + * @param volume - volume name + * @param bucket - bucket name + * @param key - key name + * @param uploadId - the upload id for this key + * @return bytes of DB key. + */ + String getMultipartKey(String volume, String bucket, String key, String + uploadId); + + + /** + * Gets the multipart info table which holds the information about + * multipart upload information of the keys. + * @return Table + */ + Table getMultipartInfoTable(); + /** * Returns number of rows in a table. This should not be used for very * large tables. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java new file mode 100644 index 0000000000..4445bf3ea2 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.codec; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.utils.db.Codec; + + +/** + * Codec Registry for OmMultipartKeyInfo. + */ +public class OmMultipartKeyInfoCodec implements Codec { + + @Override + public byte[] toPersistedFormat(OmMultipartKeyInfo object) { + Preconditions.checkNotNull(object, + "Null object can't be converted to byte array."); + return object.getProto().toByteArray(); + + } + + @Override + /** + * Construct {@link OmMultipartKeyInfo} from byte[]. If unable to convert + * return null. + */ + public OmMultipartKeyInfo fromPersistedFormat(byte[] rawData) { + Preconditions.checkNotNull( + "Null byte array can't converted to real object."); + try { + return OmMultipartKeyInfo.getFromProto(OzoneManagerProtocolProtos + .MultipartKeyInfo.parseFrom(rawData)); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException( + "Can't encode the the raw data from the byte array", e); + } + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java new file mode 100644 index 0000000000..98913d3ff7 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +/** + * Class which holds information about the response of initiate multipart + * upload request. + */ +public class OmMultipartInfo { + + private String volumeName; + private String bucketName; + private String keyName; + private String uploadID; + + /** + * Construct OmMultipartInfo object which holds information about the + * response from initiate multipart upload request. + * @param volume + * @param bucket + * @param key + * @param id + */ + public OmMultipartInfo(String volume, String bucket, String key, String id) { + this.volumeName = volume; + this.bucketName = bucket; + this.keyName = key; + this.uploadID = id; + } + + /** + * Return volume name. + * @return volumeName + */ + public String getVolumeName() { + return volumeName; + } + + /** + * Return bucket name. + * @return bucketName + */ + public String getBucketName() { + return bucketName; + } + + /** + * Return key name. + * @return keyName + */ + public String getKeyName() { + return keyName; + } + + /** + * Return uploadID. + * @return uploadID + */ + public String getUploadID() { + return uploadID; + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java new file mode 100644 index 0000000000..243b901020 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.helpers; + +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartKeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .PartKeyInfo; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class represents multipart upload information for a key, which holds + * upload part information of the key. + */ +public class OmMultipartKeyInfo { + private String uploadID; + private Map partKeyInfoList; + + /** + * Construct OmMultipartKeyInfo object which holds multipart upload + * information for a key. + * @param id + * @param list upload parts of a key. + */ + public OmMultipartKeyInfo(String id, Map list) { + this.uploadID = id; + this.partKeyInfoList = new HashMap<>(list); + } + + /** + * Returns the uploadID for this multi part upload of a key. + * @return uploadID + */ + public String getUploadID() { + return uploadID; + } + + + /** + * Construct OmMultipartInfo from MultipartKeyInfo proto object. + * @param multipartKeyInfo + * @return OmMultipartKeyInfo + */ + public static OmMultipartKeyInfo getFromProto(MultipartKeyInfo + multipartKeyInfo) { + Map list = new HashMap<>(); + multipartKeyInfo.getPartKeyInfoListList().stream().forEach(partKeyInfo + -> list.put(partKeyInfo.getPartNumber(), partKeyInfo)); + return new OmMultipartKeyInfo(multipartKeyInfo.getUploadID(), list); + } + + /** + * Construct MultipartKeyInfo from this object. + * @return MultipartKeyInfo + */ + public MultipartKeyInfo getProto() { + MultipartKeyInfo.Builder builder = MultipartKeyInfo.newBuilder() + .setUploadID(uploadID); + partKeyInfoList.forEach((key, value) -> builder.addPartKeyInfoList(value)); + return builder.build(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + return other instanceof OmMultipartKeyInfo && uploadID.equals( + ((OmMultipartKeyInfo)other).getUploadID()); + } + + @Override + public int hashCode() { + return uploadID.hashCode(); + } + +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index e4cce6505e..261232794e 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.om.protocol; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -304,5 +305,13 @@ List listS3Buckets(String userName, String startBucketName, String bucketPrefix, int maxNumOfBuckets) throws IOException; + + /** + * Initiate multipart upload for the specified key. + * @param keyArgs + * @return MultipartInfo + * @throws IOException + */ + OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 6c8c932dac..b533aedbc8 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -70,6 +71,10 @@ .OzoneManagerProtocolProtos.LocateKeyRequest; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.LocateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartInfoInitiateRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartInfoInitiateResponse; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.RenameKeyRequest; import org.apache.hadoop.ozone.protocol.proto @@ -884,4 +889,34 @@ public List listS3Buckets(String userName, String startKey, public Object getUnderlyingProxyObject() { return null; } + + @Override + public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws + IOException { + + MultipartInfoInitiateRequest.Builder multipartInfoInitiateRequest = + MultipartInfoInitiateRequest.newBuilder(); + + KeyArgs.Builder keyArgs = KeyArgs.newBuilder() + .setVolumeName(omKeyArgs.getVolumeName()) + .setBucketName(omKeyArgs.getBucketName()) + .setKeyName(omKeyArgs.getKeyName()) + .setFactor(omKeyArgs.getFactor()) + .setType(omKeyArgs.getType()); + multipartInfoInitiateRequest.setKeyArgs(keyArgs.build()); + + MultipartInfoInitiateResponse resp; + try { + resp = rpcProxy.initiateMultiPartUpload(NULL_RPC_CONTROLLER, + multipartInfoInitiateRequest.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + if (resp.getStatus() != Status.OK) { + throw new IOException("Initiate Multipart upload failed, error:" + resp + .getStatus()); + } + return new OmMultipartInfo(resp.getVolumeName(), resp.getBucketName(), resp + .getKeyName(), resp.getMultipartUploadID()); + } } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 6320cf1e26..c4c0a977d6 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -60,6 +60,7 @@ enum Status { SCM_VERSION_MISMATCH_ERROR = 21; S3_BUCKET_NOT_FOUND = 22; S3_BUCKET_ALREADY_EXISTS = 23; + INITIATE_MULTIPART_UPLOAD_ERROR = 24; } @@ -406,6 +407,31 @@ message S3ListBucketsResponse { } +message MultipartInfoInitiateRequest { + required KeyArgs keyArgs = 1; +} + +message MultipartInfoInitiateResponse { + required string volumeName = 1; + required string bucketName = 2; + required string keyName = 3; + required string multipartUploadID = 4; + required Status status = 5; +} + +message MultipartKeyInfo { + required string uploadID = 4; + repeated PartKeyInfo partKeyInfoList = 5; +} + +message PartKeyInfo { + required string partName = 1; + required uint32 partNumber = 2; + required KeyInfo partKeyInfo = 3; +} + + + /** The OM service that takes care of Ozone namespace. */ @@ -541,4 +567,7 @@ service OzoneManagerService { rpc listS3Buckets(S3ListBucketsRequest) returns(S3ListBucketsResponse); + + rpc initiateMultiPartUpload(MultipartInfoInitiateRequest) + returns (MultipartInfoInitiateResponse); } diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java new file mode 100644 index 0000000000..e52444721b --- /dev/null +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.codec; + +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.UUID; + +/** + * This class tests OmMultipartKeyInfoCodec. + */ +public class TestOmMultipartKeyInfoCodec { + + @Test + public void testOmMultipartKeyInfoCodec() { + OmMultipartKeyInfoCodec codec = new OmMultipartKeyInfoCodec(); + OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo(UUID + .randomUUID().toString(), new HashMap<>()); + byte[] data = codec.toPersistedFormat(omMultipartKeyInfo); + Assert.assertNotNull(data); + + OmMultipartKeyInfo multipartKeyInfo = codec.fromPersistedFormat(data); + Assert.assertEquals(omMultipartKeyInfo, multipartKeyInfo); + + // When random byte data passed returns null. + try { + codec.fromPersistedFormat("radom".getBytes()); + } catch (IllegalArgumentException ex) { + GenericTestUtils.assertExceptionContains("Can't encode the the raw " + + "data from the byte array", ex); + } + + } +} diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/package-info.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/package-info.java new file mode 100644 index 0000000000..8b5690a148 --- /dev/null +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ * Utility classes to encode/decode DTO objects to/from byte array. + */ + +/** + * Unit tests for codec's in OM. + */ +package org.apache.hadoop.ozone.om.codec; \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index addd8ad4bc..84afd4c441 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.protocolPB. StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.AfterClass; @@ -75,6 +76,8 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.either; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; /** @@ -1259,6 +1262,73 @@ public void testListKeyOnEmptyBucket() } } + @Test + public void testInitiateMultipartUploadWithReplicationInformationSet() throws + IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + + assertNotNull(multipartInfo); + String uploadID = multipartInfo.getUploadID(); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotNull(multipartInfo.getUploadID()); + + // Call initiate multipart upload for the same key again, this should + // generate a new uploadID. + multipartInfo = bucket.initiateMultipartUpload(keyName, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + + assertNotNull(multipartInfo); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotEquals(multipartInfo.getUploadID(), uploadID); + assertNotNull(multipartInfo.getUploadID()); + } + + @Test + public void testInitiateMultipartUploadWithDefaultReplication() throws + IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName); + + assertNotNull(multipartInfo); + String uploadID = multipartInfo.getUploadID(); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotNull(multipartInfo.getUploadID()); + + // Call initiate multipart upload for the same key again, this should + // generate a new uploadID. + multipartInfo = bucket.initiateMultipartUpload(keyName); + + assertNotNull(multipartInfo); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotEquals(multipartInfo.getUploadID(), uploadID); + assertNotNull(multipartInfo.getUploadID()); + } + + /** * Close OzoneClient and shutdown MiniOzoneCluster. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java index 2ff04af448..b72117ea1c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java @@ -261,18 +261,21 @@ public void testKeyOps() throws IOException { Mockito.doReturn(null).when(mockKm).lookupKey(null); Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0); Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong()); + Mockito.doReturn(null).when(mockKm).initiateMultipartUpload( + any(OmKeyArgs.class)); HddsWhiteboxTestUtils.setInternalState( ozoneManager, "keyManager", mockKm); doKeyOps(); MetricsRecordBuilder omMetrics = getMetrics("OMMetrics"); - assertCounter("NumKeyOps", 5L, omMetrics); + assertCounter("NumKeyOps", 6L, omMetrics); assertCounter("NumKeyAllocate", 1L, omMetrics); assertCounter("NumKeyLookup", 1L, omMetrics); assertCounter("NumKeyDeletes", 1L, omMetrics); assertCounter("NumKeyLists", 1L, omMetrics); assertCounter("NumKeys", 0L, omMetrics); + assertCounter("NumInitiateMultipartUploads", 1L, omMetrics); ozoneManager.openKey(null); @@ -295,22 +298,27 @@ public void testKeyOps() throws IOException { null, null, null, null, 0); Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class), anyLong()); + Mockito.doThrow(exception).when(mockKm).initiateMultipartUpload( + any(OmKeyArgs.class)); HddsWhiteboxTestUtils.setInternalState( ozoneManager, "keyManager", mockKm); doKeyOps(); omMetrics = getMetrics("OMMetrics"); - assertCounter("NumKeyOps", 17L, omMetrics); + assertCounter("NumKeyOps", 19L, omMetrics); assertCounter("NumKeyAllocate", 5L, omMetrics); assertCounter("NumKeyLookup", 2L, omMetrics); assertCounter("NumKeyDeletes", 3L, omMetrics); assertCounter("NumKeyLists", 2L, omMetrics); + assertCounter("NumInitiateMultipartUploads", 2L, omMetrics); assertCounter("NumKeyAllocateFails", 1L, omMetrics); assertCounter("NumKeyLookupFails", 1L, omMetrics); assertCounter("NumKeyDeleteFails", 1L, omMetrics); assertCounter("NumKeyListFails", 1L, omMetrics); + assertCounter("NumInitiateMultipartUploadFails", 1L, omMetrics); + assertCounter("NumKeys", 2L, omMetrics); @@ -412,6 +420,12 @@ private void doKeyOps() { ozoneManager.commitKey(createKeyArgs(), 0); } catch (IOException ignored) { } + + try { + ozoneManager.initiateMultipartUpload(null); + } catch (IOException ignored) { + } + } private OmKeyArgs createKeyArgs() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 38b91f528a..10c4adfad6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.utils.BackgroundService; @@ -180,4 +181,13 @@ List listKeys(String volumeName, */ BackgroundService getDeletingService(); + + /** + * Initiate multipart upload for the specified key. + * @param keyArgs + * @return MultipartInfo + * @throws IOException + */ + OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 73daff6942..7e954d17c2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -20,6 +20,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.StorageUnit; @@ -37,7 +40,11 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .PartKeyInfo; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BackgroundService; import org.apache.hadoop.utils.db.BatchOperation; @@ -54,6 +61,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -532,4 +540,74 @@ public OMMetadataManager getMetadataManager() { public BackgroundService getDeletingService() { return keyDeletingService; } + + @Override + public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws + IOException { + Preconditions.checkNotNull(omKeyArgs); + String volumeName = omKeyArgs.getVolumeName(); + String bucketName = omKeyArgs.getBucketName(); + String keyName = omKeyArgs.getKeyName(); + + metadataManager.getLock().acquireBucketLock(volumeName, bucketName); + try { + long time = Time.monotonicNowNanos(); + String uploadID = UUID.randomUUID().toString() + "-" + Long.toString( + time); + + // We are adding uploadId to key, because if multiple users try to + // perform multipart upload on the same key, each will try to upload, who + // ever finally commit the key, we see that key in ozone. Suppose if we + // don't add id, and use the same key /volume/bucket/key, when multiple + // users try to upload the key, we update the parts of the key's from + // multiple users to same key, and the key output can be a mix of the + // parts from multiple users. + + // So on same key if multiple time multipart upload is initiated we + // store multiple entries in the openKey Table. + // Checked AWS S3, when we try to run multipart upload, each time a + // new uploadId is returned. + + String multipartKey = metadataManager.getMultipartKey(volumeName, + bucketName, keyName, uploadID); + + // Not checking if there is an already key for this in the keyTable, as + // during final complete multipart upload we take care of this. + + + Map partKeyInfoMap = new HashMap<>(); + OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(uploadID, + partKeyInfoMap); + List locations = new ArrayList<>(); + OmKeyInfo omKeyInfo = new OmKeyInfo.Builder() + .setVolumeName(omKeyArgs.getVolumeName()) + .setBucketName(omKeyArgs.getBucketName()) + .setKeyName(omKeyArgs.getKeyName()) + .setCreationTime(Time.now()) + .setModificationTime(Time.now()) + .setReplicationType(omKeyArgs.getType()) + .setReplicationFactor(omKeyArgs.getFactor()) + .setOmKeyLocationInfos(Collections.singletonList( + new OmKeyLocationInfoGroup(0, locations))) + .build(); + DBStore store = metadataManager.getStore(); + try (BatchOperation batch = store.initBatchOperation()) { + // Create an entry in open key table and multipart info table for + // this key. + metadataManager.getMultipartInfoTable().putWithBatch(batch, + multipartKey, multipartKeyInfo); + metadataManager.getOpenKeyTable().putWithBatch(batch, + multipartKey, omKeyInfo); + store.commitBatchOperation(batch); + return new OmMultipartInfo(volumeName, bucketName, keyName, uploadID); + } + } catch (IOException ex) { + LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " + + "key:{}", volumeName, bucketName, keyName, ex); + throw new OMException(ex.getMessage(), + OMException.ResultCodes.INITIATE_MULTIPART_UPLOAD_FAILED); + } finally { + metadataManager.getLock().releaseBucketLock(volumeName, bucketName); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index f6925ca3a9..9f9276756e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -61,6 +61,7 @@ public class OMMetrics { private @Metric MutableCounterLong numAllocateBlockCalls; private @Metric MutableCounterLong numGetServiceLists; private @Metric MutableCounterLong numListS3Buckets; + private @Metric MutableCounterLong numInitiateMultipartUploads; // Failure Metrics @@ -84,6 +85,7 @@ public class OMMetrics { private @Metric MutableCounterLong numBlockAllocateCallFails; private @Metric MutableCounterLong numGetServiceListFails; private @Metric MutableCounterLong numListS3BucketsFails; + private @Metric MutableCounterLong numInitiateMultipartUploadFails; // Metrics for total number of volumes, buckets and keys @@ -225,6 +227,15 @@ public void incNumListS3BucketsFails() { numListS3BucketsFails.incr(); } + public void incNumInitiateMultipartUploads() { + numKeyOps.incr(); + numInitiateMultipartUploads.incr(); + } + + public void incNumInitiateMultipartUploadFails() { + numInitiateMultipartUploadFails.incr(); + } + public void incNumGetServiceLists() { numGetServiceLists.incr(); } @@ -535,6 +546,14 @@ public long getNumListS3BucketsFails() { return numListS3BucketsFails.value(); } + public long getNumInitiateMultipartUploads() { + return numInitiateMultipartUploads.value(); + } + + public long getNumInitiateMultipartUploadFails() { + return numInitiateMultipartUploadFails.value(); + } + public void unRegister() { MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index cfd4a20505..28438a1712 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.codec.OmMultipartKeyInfoCodec; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; @@ -97,6 +99,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager { private static final String DELETED_TABLE = "deletedTable"; private static final String OPEN_KEY_TABLE = "openKeyTable"; private static final String S3_TABLE = "s3Table"; + private static final String MULTIPARTINFO_TABLE = "multipartInfoTable"; private DBStore store; @@ -110,6 +113,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager { private Table deletedTable; private Table openKeyTable; private Table s3Table; + private Table multipartInfoTable; public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException { this.lock = new OzoneManagerLock(conf); @@ -154,6 +158,11 @@ public Table getS3Table() { return s3Table; } + @Override + public Table getMultipartInfoTable() { + return multipartInfoTable; + } + private void checkTableStatus(Table table, String name) throws IOException { String logMessage = "Unable to get a reference to %s table. Cannot " + @@ -186,10 +195,12 @@ public void start(OzoneConfiguration configuration) throws IOException { .addTable(DELETED_TABLE) .addTable(OPEN_KEY_TABLE) .addTable(S3_TABLE) + .addTable(MULTIPARTINFO_TABLE) .addCodec(OmKeyInfo.class, new OmKeyInfoCodec()) .addCodec(OmBucketInfo.class, new OmBucketInfoCodec()) .addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec()) .addCodec(VolumeList.class, new VolumeListCodec()) + .addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec()) .build(); userTable = @@ -219,6 +230,10 @@ public void start(OzoneConfiguration configuration) throws IOException { s3Table = this.store.getTable(S3_TABLE); checkTableStatus(s3Table, S3_TABLE); + multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE, + String.class, OmMultipartKeyInfo.class); + checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE); + } } @@ -301,6 +316,15 @@ public String getOpenKey(String volume, String bucket, return openKey; } + @Override + public String getMultipartKey(String volume, String bucket, String key, + String + uploadId) { + String multipartKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket + + OM_KEY_PREFIX + key + OM_KEY_PREFIX + uploadId; + return multipartKey; + } + /** * Returns the OzoneManagerLock used on Metadata DB. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 2c2024585f..8b0616c7a9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -1553,6 +1554,26 @@ public List listS3Buckets(String userName, String startKey, } } + @Override + public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws + IOException { + OmMultipartInfo multipartInfo; + metrics.incNumInitiateMultipartUploads(); + try { + multipartInfo = keyManager.initiateMultipartUpload(keyArgs); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null : + keyArgs.toAuditMap())); + } catch (IOException ex) { + AUDIT.logWriteFailure(buildAuditMessageForFailure( + OMAction.INITIATE_MULTIPART_UPLOAD, + (keyArgs == null) ? null : keyArgs.toAuditMap(), ex)); + metrics.incNumInitiateMultipartUploadFails(); + throw ex; + } + return multipartInfo; + } + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 3292adf431..1d096f5052 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -115,6 +115,7 @@ public enum ResultCodes { SCM_VERSION_MISMATCH_ERROR, SCM_IN_CHILL_MODE, S3_BUCKET_ALREADY_EXISTS, - S3_BUCKET_NOT_FOUND + S3_BUCKET_NOT_FOUND, + INITIATE_MULTIPART_UPLOAD_FAILED; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 64806ed885..13d54c5cbe 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -85,6 +86,10 @@ .LocateKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .LocateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartInfoInitiateRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartInfoInitiateResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .RenameKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -190,6 +195,8 @@ private Status exceptionToResponseStatus(IOException ex) { return Status.S3_BUCKET_ALREADY_EXISTS; case S3_BUCKET_NOT_FOUND: return Status.S3_BUCKET_NOT_FOUND; + case INITIATE_MULTIPART_UPLOAD_FAILED: + return Status.INITIATE_MULTIPART_UPLOAD_ERROR; default: return Status.INTERNAL_ERROR; } @@ -651,4 +658,30 @@ public S3ListBucketsResponse listS3Buckets(RpcController controller, } return resp.build(); } + + @Override + public MultipartInfoInitiateResponse initiateMultiPartUpload( + RpcController controller, MultipartInfoInitiateRequest request) { + MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse + .newBuilder(); + try { + KeyArgs keyArgs = request.getKeyArgs(); + OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setType(keyArgs.getType()) + .setFactor(keyArgs.getFactor()) + .build(); + OmMultipartInfo multipartInfo = impl.initiateMultipartUpload(omKeyArgs); + resp.setVolumeName(multipartInfo.getVolumeName()); + resp.setBucketName(multipartInfo.getBucketName()); + resp.setKeyName(multipartInfo.getKeyName()); + resp.setMultipartUploadID(multipartInfo.getUploadID()); + resp.setStatus(Status.OK); + } catch (IOException ex) { + resp.setStatus(exceptionToResponseStatus(ex)); + } + return resp.build(); + } }