From d44b37d7acc1b9e474494335bc9813ed1d826061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Wed, 12 Dec 2018 14:16:21 +0100 Subject: [PATCH] HDDS-889. MultipartUpload: Support uploading a part file in ozone. Contributed by Bharat Viswanadham. --- .../hadoop/ozone/client/OzoneBucket.java | 7 + .../client/io/ChunkGroupOutputStream.java | 47 ++++- .../ozone/client/io/OzoneOutputStream.java | 10 + .../ozone/client/protocol/ClientProtocol.java | 5 + .../hadoop/ozone/client/rest/RestClient.java | 12 ++ .../hadoop/ozone/client/rpc/RpcClient.java | 50 +++++ .../apache/hadoop/ozone/audit/OMAction.java | 3 +- .../hadoop/ozone/om/helpers/OmKeyArgs.java | 42 +++- .../hadoop/ozone/om/helpers/OmKeyInfo.java | 6 + .../OmMultipartCommitUploadPartInfo.java | 34 ++++ .../ozone/om/helpers/OmMultipartKeyInfo.java | 12 ++ .../om/protocol/OzoneManagerProtocol.java | 8 +- ...ManagerProtocolClientSideTranslatorPB.java | 60 +++++- .../src/main/proto/OzoneManagerProtocol.proto | 19 ++ .../ozone/client/rpc/TestOzoneRpcClient.java | 188 +++++++++++++++++- .../apache/hadoop/ozone/om/KeyManager.java | 5 + .../hadoop/ozone/om/KeyManagerImpl.java | 183 ++++++++++++++--- .../org/apache/hadoop/ozone/om/OMMetrics.java | 11 + .../apache/hadoop/ozone/om/OzoneManager.java | 27 +++ .../ozone/om/exceptions/OMException.java | 4 +- ...ManagerProtocolServerSideTranslatorPB.java | 39 ++++ 21 files changed, 717 insertions(+), 55 deletions(-) create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartCommitUploadPartInfo.java diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index cc8ddcdfab..3a681d7ff5 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -353,6 +353,13 @@ public OmMultipartInfo initiateMultipartUpload(String key) defaultReplication); } + public OzoneOutputStream createMultipartKey(String key, long size, + int partNumber, String uploadID) + throws IOException { + return proxy.createMultipartKey(volumeName, name, key, size, partNumber, + uploadID); + } + /** * An Iterator to iterate over {@link OzoneKey} list. */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index bc88255fd5..49835ccd80 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -25,13 +25,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.ozone.common.Checksum; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -85,6 +81,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final long blockSize; private final Checksum checksum; private List bufferList; + private OmMultipartCommitUploadPartInfo commitUploadPartInfo; /** * A constructor for testing purpose only. */ @@ -152,7 +149,7 @@ public ChunkGroupOutputStream(OpenKeySession handler, OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout, - Checksum checksum) { + Checksum checksum, String uploadID, int partNumber, boolean isMultipart) { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.omClient = omClient; @@ -161,6 +158,8 @@ public ChunkGroupOutputStream(OpenKeySession handler, this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) .setType(type).setFactor(factor).setDataSize(info.getDataSize()) + .setIsMultipartKey(isMultipart).setMultipartUploadID( + uploadID).setMultipartUploadPartNumber(partNumber) .build(); this.openID = handler.getId(); this.xceiverClientManager = xceiverClientManager; @@ -498,7 +497,15 @@ public void close() throws IOException { removeEmptyBlocks(); keyArgs.setDataSize(getKeyLength()); keyArgs.setLocationInfoList(getLocationInfoList()); - omClient.commitKey(keyArgs, openID); + // When the key is multipart upload part file upload, we should not + // commit the key, as this is not an actual key, this is a just a + // partial key of a large file. + if (keyArgs.getIsMultipartKey()) { + commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs, + openID); + } else { + omClient.commitKey(keyArgs, openID); + } } else { LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); } @@ -512,6 +519,10 @@ public void close() throws IOException { } } + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + return commitUploadPartInfo; + } + /** * Builder class of ChunkGroupOutputStream. */ @@ -529,6 +540,20 @@ public static class Builder { private long blockSize; private long watchTimeout; private Checksum checksum; + private String multipartUploadID; + private int multipartNumber; + private boolean isMultipartKey; + + + public Builder setMultipartUploadID(String uploadID) { + this.multipartUploadID = uploadID; + return this; + } + + public Builder setMultipartNumber(int partNumber) { + this.multipartNumber = partNumber; + return this; + } public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -597,10 +622,16 @@ public Builder setChecksum(Checksum checksumObj){ return this; } + public Builder setIsMultipartKey(boolean isMultipart) { + this.isMultipartKey = isMultipart; + return this; + } + public ChunkGroupOutputStream build() throws IOException { return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, - streamBufferMaxSize, blockSize, watchTimeout, checksum); + streamBufferMaxSize, blockSize, watchTimeout, checksum, + multipartUploadID, multipartNumber, isMultipartKey); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index 5369220a43..8a896ad16f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.client.io; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; + import java.io.IOException; import java.io.OutputStream; @@ -58,6 +60,14 @@ public synchronized void close() throws IOException { outputStream.close(); } + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + if (outputStream instanceof ChunkGroupOutputStream) { + return ((ChunkGroupOutputStream) outputStream).getCommitUploadPartInfo(); + } + // Otherwise return null. + return null; + } + public OutputStream getOutputStream() { return outputStream; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index 8afbd9876f..02dc530b1e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -401,5 +401,10 @@ OmMultipartInfo initiateMultipartUpload(String volumeName, String bucketName, String keyName, ReplicationType type, ReplicationFactor factor) throws IOException; + OzoneOutputStream createMultipartKey(String volumeName, String bucketName, + String keyName, long size, + int partNumber, String uploadID) + throws IOException; + } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java index 05f45c53eb..b40d5e1a0b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java @@ -963,4 +963,16 @@ public OmMultipartInfo initiateMultipartUpload(String volumeName, throw new UnsupportedOperationException("Ozone REST protocol does not " + "support this operation."); } + + @Override + public OzoneOutputStream createMultipartKey(String volumeName, + String bucketName, + String keyName, + long size, + int partNumber, + String uploadID) + throws IOException { + throw new UnsupportedOperationException("Ozone REST protocol does not " + + "support this operation."); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 7aa359689e..6cf6c4f080 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -701,4 +701,54 @@ public OmMultipartInfo initiateMultipartUpload(String volumeName, return multipartInfo; } + @Override + public OzoneOutputStream createMultipartKey(String volumeName, + String bucketName, + String keyName, + long size, + int partNumber, + String uploadID) + throws IOException { + HddsClientUtils.verifyResourceName(volumeName, bucketName); + HddsClientUtils.checkNotNull(keyName, uploadID); + Preconditions.checkArgument(partNumber > 0, "Part number should be " + + "greater than zero"); + Preconditions.checkArgument(size >=0, "size should be greater than or " + + "equal to zero"); + String requestId = UUID.randomUUID().toString(); + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(size) + .setIsMultipartKey(true) + .setMultipartUploadID(uploadID) + .build(); + + OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); + ChunkGroupOutputStream groupOutputStream = + new ChunkGroupOutputStream.Builder() + .setHandler(openKey) + .setXceiverClientManager(xceiverClientManager) + .setScmClient(storageContainerLocationClient) + .setOmClient(ozoneManagerClient) + .setChunkSize(chunkSize) + .setRequestID(requestId) + .setType(openKey.getKeyInfo().getType()) + .setFactor(openKey.getKeyInfo().getFactor()) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setBlockSize(blockSize) + .setChecksum(checksum) + .setMultipartNumber(partNumber) + .setMultipartUploadID(uploadID) + .setIsMultipartKey(true) + .build(); + groupOutputStream.addPreallocateBlocks( + openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); + return new OzoneOutputStream(groupOutputStream); + } + } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java index b08a530b2a..a65f6bc724 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java @@ -47,7 +47,8 @@ public enum OMAction implements AuditAction { READ_BUCKET, READ_KEY, LIST_S3BUCKETS, - INITIATE_MULTIPART_UPLOAD; + INITIATE_MULTIPART_UPLOAD, + COMMIT_MULTIPART_UPLOAD_PARTKEY; @Override public String getAction() { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index e56ad7f161..7ded812bab 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -39,10 +39,14 @@ public final class OmKeyArgs implements Auditable { private final ReplicationType type; private final ReplicationFactor factor; private List locationInfoList; + private final boolean isMultipartKey; + private final String multipartUploadID; + private final int multipartUploadPartNumber; private OmKeyArgs(String volumeName, String bucketName, String keyName, long dataSize, ReplicationType type, ReplicationFactor factor, - List locationInfoList) { + List locationInfoList, boolean isMultipart, + String uploadID, int partNumber) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; @@ -50,6 +54,21 @@ private OmKeyArgs(String volumeName, String bucketName, String keyName, this.type = type; this.factor = factor; this.locationInfoList = locationInfoList; + this.isMultipartKey = isMultipart; + this.multipartUploadID = uploadID; + this.multipartUploadPartNumber = partNumber; + } + + public boolean getIsMultipartKey() { + return isMultipartKey; + } + + public String getMultipartUploadID() { + return multipartUploadID; + } + + public int getMultipartUploadPartNumber() { + return multipartUploadPartNumber; } public ReplicationType getType() { @@ -123,6 +142,9 @@ public static class Builder { private ReplicationType type; private ReplicationFactor factor; private List locationInfoList; + private boolean isMultipartKey; + private String multipartUploadID; + private int multipartUploadPartNumber; public Builder setVolumeName(String volume) { this.volumeName = volume; @@ -159,9 +181,25 @@ public Builder setLocationInfoList(List locationInfos) { return this; } + public Builder setIsMultipartKey(boolean isMultipart) { + this.isMultipartKey = isMultipart; + return this; + } + + public Builder setMultipartUploadID(String uploadID) { + this.multipartUploadID = uploadID; + return this; + } + + public Builder setMultipartUploadPartNumber(int partNumber) { + this.multipartUploadPartNumber = partNumber; + return this; + } + public OmKeyArgs build() { return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type, - factor, locationInfoList); + factor, locationInfoList, isMultipartKey, multipartUploadID, + multipartUploadPartNumber); } } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 9f2eb87092..030844efff 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -215,6 +215,7 @@ public static class Builder { private long modificationTime; private HddsProtos.ReplicationType type; private HddsProtos.ReplicationFactor factor; + private boolean isMultipartKey; public Builder setVolumeName(String volume) { this.volumeName = volume; @@ -262,6 +263,11 @@ public Builder setReplicationType(HddsProtos.ReplicationType replType) { return this; } + public Builder setIsMultipartKey(boolean isMultipart) { + this.isMultipartKey = isMultipart; + return this; + } + public OmKeyInfo build() { return new OmKeyInfo( volumeName, bucketName, keyName, omKeyLocationInfoGroups, diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartCommitUploadPartInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartCommitUploadPartInfo.java new file mode 100644 index 0000000000..646cb421e4 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartCommitUploadPartInfo.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.helpers; + +/** + * This class holds information about the response from commit multipart + * upload part request. + */ +public class OmMultipartCommitUploadPartInfo { + + private final String partName; + + public OmMultipartCommitUploadPartInfo(String name) { + this.partName = name; + } + + public String getPartName() { + return partName; + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java index 243b901020..152091df2f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java @@ -51,6 +51,18 @@ public String getUploadID() { return uploadID; } + public Map getPartKeyInfoList() { + return partKeyInfoList; + } + + public void addPartKeyInfo(int partNumber, PartKeyInfo partKeyInfo) { + this.partKeyInfoList.put(partNumber, partKeyInfo); + } + + public PartKeyInfo getPartKeyInfo(int partNumber) { + return partKeyInfoList.get(partNumber); + } + /** * Construct OmMultipartInfo from MultipartKeyInfo proto object. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 261232794e..ee1ff6fb26 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -16,7 +16,7 @@ * limitations under the License. */ package org.apache.hadoop.ozone.om.protocol; - +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -313,5 +313,11 @@ List listS3Buckets(String userName, String startBucketName, * @throws IOException */ OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; + + + + OmMultipartCommitUploadPartInfo commitMultipartUploadPart( + OmKeyArgs omKeyArgs, long clientID) throws IOException; + } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index b533aedbc8..ca09f61205 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; @@ -71,6 +72,10 @@ .OzoneManagerProtocolProtos.LocateKeyRequest; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.LocateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartInfoInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -556,12 +561,27 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { KeyArgs.Builder keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) - .setFactor(args.getFactor()) - .setType(args.getType()) .setKeyName(args.getKeyName()); + + if (args.getFactor() != null) { + keyArgs.setFactor(args.getFactor()); + } + + if (args.getType() != null) { + keyArgs.setType(args.getType()); + } + if (args.getDataSize() > 0) { keyArgs.setDataSize(args.getDataSize()); } + + if (args.getMultipartUploadID() != null) { + keyArgs.setMultipartUploadID(args.getMultipartUploadID()); + } + + keyArgs.setIsMultipartKey(args.getIsMultipartKey()); + + req.setKeyArgs(keyArgs.build()); final LocateKeyResponse resp; @@ -919,4 +939,40 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws return new OmMultipartInfo(resp.getVolumeName(), resp.getBucketName(), resp .getKeyName(), resp.getMultipartUploadID()); } + + @Override + public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( + OmKeyArgs omKeyArgs, long clientID) throws IOException { + MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest + = MultipartCommitUploadPartRequest.newBuilder(); + + KeyArgs.Builder keyArgs = KeyArgs.newBuilder() + .setVolumeName(omKeyArgs.getVolumeName()) + .setBucketName(omKeyArgs.getBucketName()) + .setKeyName(omKeyArgs.getKeyName()) + .setMultipartUploadID(omKeyArgs.getMultipartUploadID()) + .setIsMultipartKey(omKeyArgs.getIsMultipartKey()) + .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber()); + multipartCommitUploadPartRequest.setClientID(clientID); + multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build()); + + MultipartCommitUploadPartResponse response; + + try { + response = rpcProxy.commitMultipartUploadPart(NULL_RPC_CONTROLLER, + multipartCommitUploadPartRequest.build()); + + } catch (ServiceException ex) { + throw ProtobufHelper.getRemoteException(ex); + } + + if (response.getStatus() != Status.OK) { + throw new IOException("Commit multipart upload part key failed, error:" + + response.getStatus()); + } + + OmMultipartCommitUploadPartInfo info = new + OmMultipartCommitUploadPartInfo(response.getPartName()); + return info; + } } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index c4c0a977d6..deb88ee086 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -61,6 +61,8 @@ enum Status { S3_BUCKET_NOT_FOUND = 22; S3_BUCKET_ALREADY_EXISTS = 23; INITIATE_MULTIPART_UPLOAD_ERROR = 24; + MULTIPART_UPLOAD_PARTFILE_ERROR = 25; + NO_SUCH_MULTIPART_UPLOAD_ERROR = 26; } @@ -244,6 +246,9 @@ message KeyArgs { optional hadoop.hdds.ReplicationType type = 5; optional hadoop.hdds.ReplicationFactor factor = 6; repeated KeyLocation keyLocations = 7; + optional bool isMultipartKey = 8; + optional string multipartUploadID = 9; + optional uint32 multipartNumber = 10; } message KeyLocation { @@ -430,6 +435,17 @@ message PartKeyInfo { required KeyInfo partKeyInfo = 3; } +message MultipartCommitUploadPartRequest { + required KeyArgs keyArgs = 1; + required uint64 clientID = 2; +} + +message MultipartCommitUploadPartResponse { + // This one is returned as Etag for S3. + optional string partName = 1; + required Status status = 2; +} + /** @@ -570,4 +586,7 @@ service OzoneManagerService { rpc initiateMultiPartUpload(MultipartInfoInitiateRequest) returns (MultipartInfoInitiateResponse); + + rpc commitMultipartUploadPart(MultipartCommitUploadPartRequest) + returns (MultipartCommitUploadPartResponse); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 84afd4c441..474b9204a4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.*; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -48,15 +50,12 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers .KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.client.rest.OzoneException; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.protocolPB. StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.AfterClass; @@ -79,6 +78,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * This class is to test all the public facing APIs of Ozone Client. @@ -732,7 +732,7 @@ public void testPutKeyAndGetKeyThreeNodes() try { // try to read readKey(bucket, keyName, value); - Assert.fail("Expected exception not thrown"); + fail("Expected exception not thrown"); } catch (IOException e) { Assert.assertTrue(e.getMessage().contains("Failed to execute command")); Assert.assertTrue( @@ -914,7 +914,7 @@ public void testReadKeyWithCorruptedData() throws IOException { try { OzoneInputStream is = bucket.readKey(keyName); is.read(new byte[100]); - Assert.fail("Reading corrupted data should fail."); + fail("Reading corrupted data should fail."); } catch (OzoneChecksumException e) { GenericTestUtils.assertExceptionContains("Checksum mismatch", e); } @@ -1116,7 +1116,7 @@ public void testListBucketsOnEmptyVolume() OzoneVolume vol = store.getVolume(volume); Iterator buckets = vol.listBuckets(""); while(buckets.hasNext()) { - Assert.fail(); + fail(); } } @@ -1258,7 +1258,7 @@ public void testListKeyOnEmptyBucket() OzoneBucket buc = vol.getBucket(bucket); Iterator keys = buc.listKeys(""); while(keys.hasNext()) { - Assert.fail(); + fail(); } } @@ -1296,6 +1296,7 @@ public void testInitiateMultipartUploadWithReplicationInformationSet() throws assertNotNull(multipartInfo.getUploadID()); } + @Test public void testInitiateMultipartUploadWithDefaultReplication() throws IOException { @@ -1329,6 +1330,177 @@ public void testInitiateMultipartUploadWithDefaultReplication() throws } + @Test + public void testUploadPartWithNoOverride() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + String sampleData = "sample Value"; + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + + assertNotNull(multipartInfo); + String uploadID = multipartInfo.getUploadID(); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotNull(multipartInfo.getUploadID()); + + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + sampleData.length(), 1, uploadID); + ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, + sampleData.length()); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream + .getCommitUploadPartInfo(); + + assertNotNull(commitUploadPartInfo); + String partName = commitUploadPartInfo.getPartName(); + assertNotNull(commitUploadPartInfo.getPartName()); + + } + + @Test + public void testUploadPartOverrideWithStandAlone() throws IOException { + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + String sampleData = "sample Value"; + int partNumber = 1; + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + + assertNotNull(multipartInfo); + String uploadID = multipartInfo.getUploadID(); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotNull(multipartInfo.getUploadID()); + + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + sampleData.length(), partNumber, uploadID); + ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, + sampleData.length()); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream + .getCommitUploadPartInfo(); + + assertNotNull(commitUploadPartInfo); + String partName = commitUploadPartInfo.getPartName(); + assertNotNull(commitUploadPartInfo.getPartName()); + + //Overwrite the part by creating part key with same part number. + sampleData = "sample Data Changed"; + ozoneOutputStream = bucket.createMultipartKey(keyName, + sampleData.length(), partNumber, uploadID); + ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, "name" + .length()); + ozoneOutputStream.close(); + + commitUploadPartInfo = ozoneOutputStream + .getCommitUploadPartInfo(); + + assertNotNull(commitUploadPartInfo); + assertNotNull(commitUploadPartInfo.getPartName()); + + // PartName should be different from old part Name. + assertNotEquals("Part names should be different", partName, + commitUploadPartInfo.getPartName()); + } + + @Test + public void testUploadPartOverrideWithRatis() throws IOException { + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + String sampleData = "sample Value"; + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + ReplicationType.RATIS, ReplicationFactor.THREE); + + assertNotNull(multipartInfo); + String uploadID = multipartInfo.getUploadID(); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotNull(multipartInfo.getUploadID()); + + int partNumber = 1; + + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + sampleData.length(), partNumber, uploadID); + ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, + sampleData.length()); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream + .getCommitUploadPartInfo(); + + assertNotNull(commitUploadPartInfo); + String partName = commitUploadPartInfo.getPartName(); + assertNotNull(commitUploadPartInfo.getPartName()); + + //Overwrite the part by creating part key with same part number. + sampleData = "sample Data Changed"; + ozoneOutputStream = bucket.createMultipartKey(keyName, + sampleData.length(), partNumber, uploadID); + ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, "name" + .length()); + ozoneOutputStream.close(); + + commitUploadPartInfo = ozoneOutputStream + .getCommitUploadPartInfo(); + + assertNotNull(commitUploadPartInfo); + assertNotNull(commitUploadPartInfo.getPartName()); + + // PartName should be different from old part Name. + assertNotEquals("Part names should be different", partName, + commitUploadPartInfo.getPartName()); + } + + @Test + public void testNoSuchUploadError() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + String sampleData = "sample Value"; + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = "random"; + try { + bucket.createMultipartKey(keyName, sampleData.length(), 1, uploadID); + fail("testNoSuchUploadError failed"); + } catch (IOException ex) { + GenericTestUtils.assertExceptionContains("NO_SUCH_MULTIPART_UPLOAD_ERROR", + ex); + } + + } + + /** * Close OzoneClient and shutdown MiniOzoneCluster. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 10c4adfad6..f5f3f1bbac 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.utils.BackgroundService; @@ -190,4 +191,8 @@ List listKeys(String volumeName, */ OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; + + OmMultipartCommitUploadPartInfo commitMultipartUploadPart( + OmKeyArgs keyArgs, long clientID) throws IOException; + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 7e954d17c2..614d453716 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; @@ -205,17 +206,35 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { ReplicationType type = args.getType(); long currentTime = Time.monotonicNowNanos(); - // If user does not specify a replication strategy or - // replication factor, OM will use defaults. - if (factor == null) { - factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE; - } - - if (type == null) { - type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; - } - try { + if (args.getIsMultipartKey()) { + // When key is multipart upload part key, we should take replication + // type and replication factor from original key which has done + // initiate multipart upload. If we have not found any such, we throw + // error no such multipart upload. + String uploadID = args.getMultipartUploadID(); + Preconditions.checkNotNull(uploadID); + String multipartKey = metadataManager.getMultipartKey(volumeName, + bucketName, keyName, uploadID); + OmKeyInfo partKeyInfo = metadataManager.getOpenKeyTable().get( + multipartKey); + if (partKeyInfo == null) { + throw new OMException("No such Multipart upload is with specified " + + "uploadId " + uploadID, ResultCodes.NO_SUCH_MULTIPART_UPLOAD); + } else { + factor = partKeyInfo.getFactor(); + type = partKeyInfo.getType(); + } + } else { + // If user does not specify a replication strategy or + // replication factor, OM will use defaults. + if (factor == null) { + factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE; + } + if (type == null) { + type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; + } + } long requestedSize = Math.min(preallocateMax, args.getDataSize()); List locations = new ArrayList<>(); String objectKey = metadataManager.getOzoneKey( @@ -254,31 +273,28 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { // value, then this value is used, otherwise, we allocate a single block // which is the current size, if read by the client. long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize; - OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey); + OmKeyInfo keyInfo; long openVersion; - if (keyInfo != null) { - // the key already exist, the new blocks will be added as new version - // when locations.size = 0, the new version will have identical blocks - // as its previous version - openVersion = keyInfo.addNewVersion(locations); - keyInfo.setDataSize(size + keyInfo.getDataSize()); - } else { - // the key does not exist, create a new object, the new blocks are the - // version 0 - keyInfo = new OmKeyInfo.Builder() - .setVolumeName(args.getVolumeName()) - .setBucketName(args.getBucketName()) - .setKeyName(args.getKeyName()) - .setOmKeyLocationInfos(Collections.singletonList( - new OmKeyLocationInfoGroup(0, locations))) - .setCreationTime(Time.now()) - .setModificationTime(Time.now()) - .setDataSize(size) - .setReplicationType(type) - .setReplicationFactor(factor) - .build(); + if (args.getIsMultipartKey()) { + // For this upload part we don't need to check in KeyTable. As this + // is not an actual key, it is a part of the key. + keyInfo = createKeyInfo(args, locations, factor, type, size); openVersion = 0; + } else { + keyInfo = metadataManager.getKeyTable().get(objectKey); + if (keyInfo != null) { + // the key already exist, the new blocks will be added as new version + // when locations.size = 0, the new version will have identical blocks + // as its previous version + openVersion = keyInfo.addNewVersion(locations); + keyInfo.setDataSize(size + keyInfo.getDataSize()); + } else { + // the key does not exist, create a new object, the new blocks are the + // version 0 + keyInfo = createKeyInfo(args, locations, factor, type, size); + openVersion = 0; + } } String openKey = metadataManager.getOpenKey( volumeName, bucketName, keyName, currentTime); @@ -311,6 +327,33 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } } + /** + * Create OmKeyInfo object. + * @param keyArgs + * @param locations + * @param factor + * @param type + * @param size + * @return + */ + private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs, + List locations, + ReplicationFactor factor, + ReplicationType type, long size) { + return new OmKeyInfo.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setOmKeyLocationInfos(Collections.singletonList( + new OmKeyLocationInfoGroup(0, locations))) + .setCreationTime(Time.now()) + .setModificationTime(Time.now()) + .setDataSize(size) + .setReplicationType(type) + .setReplicationFactor(factor) + .build(); + } + @Override public void commitKey(OmKeyArgs args, long clientID) throws IOException { Preconditions.checkNotNull(args); @@ -610,4 +653,80 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws metadataManager.getLock().releaseBucketLock(volumeName, bucketName); } } + + @Override + public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( + OmKeyArgs omKeyArgs, long clientID) throws IOException { + Preconditions.checkNotNull(omKeyArgs); + String volumeName = omKeyArgs.getVolumeName(); + String bucketName = omKeyArgs.getBucketName(); + String keyName = omKeyArgs.getKeyName(); + String uploadID = omKeyArgs.getMultipartUploadID(); + int partNumber = omKeyArgs.getMultipartUploadPartNumber(); + + metadataManager.getLock().acquireBucketLock(volumeName, bucketName); + String partName; + try { + String multipartKey = metadataManager.getMultipartKey(volumeName, + bucketName, keyName, uploadID); + OmMultipartKeyInfo multipartKeyInfo = metadataManager + .getMultipartInfoTable().get(multipartKey); + + String openKey = metadataManager.getOpenKey( + volumeName, bucketName, keyName, clientID); + OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get( + openKey); + + partName = keyName + clientID; + if (multipartKeyInfo == null) { + throw new OMException("No such Multipart upload is with specified " + + "uploadId " + uploadID, ResultCodes.NO_SUCH_MULTIPART_UPLOAD); + } else { + PartKeyInfo oldPartKeyInfo = + multipartKeyInfo.getPartKeyInfo(partNumber); + PartKeyInfo.Builder partKeyInfo = PartKeyInfo.newBuilder(); + partKeyInfo.setPartName(partName); + partKeyInfo.setPartNumber(partNumber); + partKeyInfo.setPartKeyInfo(keyInfo.getProtobuf()); + multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build()); + if (oldPartKeyInfo == null) { + // This is the first time part is being added. + DBStore store = metadataManager.getStore(); + try (BatchOperation batch = store.initBatchOperation()) { + metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey); + metadataManager.getMultipartInfoTable().putWithBatch(batch, + multipartKey, multipartKeyInfo); + store.commitBatchOperation(batch); + } + } else { + // If we have this part already, that means we are overriding it. + // We need to 3 steps. + // Add the old entry to delete table. + // Remove the new entry from openKey table. + // Add the new entry in to the list of part keys. + DBStore store = metadataManager.getStore(); + try (BatchOperation batch = store.initBatchOperation()) { + metadataManager.getDeletedTable().putWithBatch(batch, + oldPartKeyInfo.getPartName(), + OmKeyInfo.getFromProtobuf(oldPartKeyInfo.getPartKeyInfo())); + metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey); + metadataManager.getMultipartInfoTable().putWithBatch(batch, + multipartKey, multipartKeyInfo); + store.commitBatchOperation(batch); + } + } + } + } catch (IOException ex) { + LOG.error("Upload part Failed: volume:{} bucket:{} " + + "key:{} PartNumber: {}", volumeName, bucketName, keyName, + partNumber, ex); + throw new OMException(ex.getMessage(), ResultCodes.UPLOAD_PART_FAILED); + } finally { + metadataManager.getLock().releaseBucketLock(volumeName, bucketName); + } + + return new OmMultipartCommitUploadPartInfo(partName); + + } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 9f9276756e..715ebb8e1f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -86,6 +86,8 @@ public class OMMetrics { private @Metric MutableCounterLong numGetServiceListFails; private @Metric MutableCounterLong numListS3BucketsFails; private @Metric MutableCounterLong numInitiateMultipartUploadFails; + private @Metric MutableCounterLong numCommitMultipartUploadParts; + private @Metric MutableCounterLong getNumCommitMultipartUploadPartFails; // Metrics for total number of volumes, buckets and keys @@ -236,6 +238,15 @@ public void incNumInitiateMultipartUploadFails() { numInitiateMultipartUploadFails.incr(); } + public void incNumCommitMultipartUploadParts() { + numKeyOps.incr(); + numCommitMultipartUploadParts.incr(); + } + + public void incNumCommitMultipartUploadPartFails() { + numInitiateMultipartUploadFails.incr(); + } + public void incNumGetServiceLists() { numGetServiceLists.incr(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 8b0616c7a9..a044bc24d7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; @@ -1574,6 +1575,32 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws return multipartInfo; } + @Override + public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( + OmKeyArgs keyArgs, long clientID) throws IOException { + boolean auditSuccess = false; + OmMultipartCommitUploadPartInfo commitUploadPartInfo; + metrics.incNumCommitMultipartUploadParts(); + try { + commitUploadPartInfo = keyManager.commitMultipartUploadPart(keyArgs, + clientID); + auditSuccess = true; + } catch (IOException ex) { + AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction + .INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null : keyArgs + .toAuditMap(), ex)); + metrics.incNumCommitMultipartUploadPartFails(); + throw ex; + } finally { + if(auditSuccess) { + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, (keyArgs == null) ? null : + keyArgs.toAuditMap())); + } + } + return commitUploadPartInfo; + } + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 1d096f5052..bdf2fb50bf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -116,6 +116,8 @@ public enum ResultCodes { SCM_IN_CHILL_MODE, S3_BUCKET_ALREADY_EXISTS, S3_BUCKET_NOT_FOUND, - INITIATE_MULTIPART_UPLOAD_FAILED; + INITIATE_MULTIPART_UPLOAD_FAILED, + NO_SUCH_MULTIPART_UPLOAD, + UPLOAD_PART_FAILED; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 13d54c5cbe..4490921ab0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; @@ -86,6 +87,10 @@ .LocateKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .LocateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartInfoInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -197,6 +202,11 @@ private Status exceptionToResponseStatus(IOException ex) { return Status.S3_BUCKET_NOT_FOUND; case INITIATE_MULTIPART_UPLOAD_FAILED: return Status.INITIATE_MULTIPART_UPLOAD_ERROR; + case NO_SUCH_MULTIPART_UPLOAD: + return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR; + case UPLOAD_PART_FAILED: + return Status.MULTIPART_UPLOAD_PARTFILE_ERROR; + default: return Status.INTERNAL_ERROR; } @@ -378,6 +388,9 @@ public LocateKeyResponse createKey( .setDataSize(keyArgs.getDataSize()) .setType(type) .setFactor(factor) + .setIsMultipartKey(keyArgs.getIsMultipartKey()) + .setMultipartUploadID(keyArgs.getMultipartUploadID()) + .setMultipartUploadPartNumber(keyArgs.getMultipartNumber()) .build(); if (keyArgs.hasDataSize()) { omKeyArgs.setDataSize(keyArgs.getDataSize()); @@ -684,4 +697,30 @@ public MultipartInfoInitiateResponse initiateMultiPartUpload( } return resp.build(); } + + @Override + public MultipartCommitUploadPartResponse commitMultipartUploadPart( + RpcController controller, MultipartCommitUploadPartRequest request) { + MultipartCommitUploadPartResponse.Builder resp = + MultipartCommitUploadPartResponse.newBuilder(); + try { + KeyArgs keyArgs = request.getKeyArgs(); + OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setMultipartUploadID(keyArgs.getMultipartUploadID()) + .setIsMultipartKey(keyArgs.getIsMultipartKey()) + .setMultipartUploadPartNumber(keyArgs.getMultipartNumber()) + .build(); + OmMultipartCommitUploadPartInfo commitUploadPartInfo = + impl.commitMultipartUploadPart(omKeyArgs, request.getClientID()); + resp.setPartName(commitUploadPartInfo.getPartName()); + resp.setStatus(Status.OK); + } catch (IOException ex) { + resp.setStatus(exceptionToResponseStatus(ex)); + } + return resp.build(); + } + }