From 90e5eb0a48083650fce92272a92a57cdb8eaa0b0 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 31 Jul 2019 09:18:40 -0700 Subject: [PATCH] HDDS-1849. Implement S3 Complete MPU request to use Cache and DoubleBuffer. (#1181) --- .../ratis/utils/OzoneManagerRatisUtils.java | 3 + .../S3MultipartUploadCompleteRequest.java | 314 ++++++++++++++++++ .../S3MultipartUploadCommitPartResponse.java | 3 +- .../S3MultipartUploadCompleteResponse.java | 46 +++ .../OzoneManagerHARequestHandlerImpl.java | 1 + .../ozone/om/request/TestOMRequestUtils.java | 22 ++ .../s3/multipart/TestS3MultipartRequest.java | 23 ++ .../TestS3MultipartUploadCompleteRequest.java | 177 ++++++++++ 8 files changed, 588 insertions(+), 1 deletion(-) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 4d99b6685f..aef189c02f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest; import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest; import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest; @@ -114,6 +115,8 @@ public static OMClientRequest createClientRequest(OMRequest omRequest) { return new S3MultipartUploadCommitPartRequest(omRequest); case AbortMultiPartUpload: return new S3MultipartUploadAbortRequest(omRequest); + case CompleteMultiPartUpload: + return new S3MultipartUploadCompleteRequest(omRequest); default: // TODO: will update once all request types are implemented. return null; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java new file mode 100644 index 0000000000..f263c79efd --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -0,0 +1,314 @@ +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import com.google.common.base.Optional; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.audit.OMAction; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCompleteResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .PartKeyInfo; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; + +/** + * Handle Multipart upload complete request. + */ +public class S3MultipartUploadCompleteRequest extends OMKeyRequest { + + private static final Logger LOG = + LoggerFactory.getLogger(S3MultipartUploadCompleteRequest.class); + + public S3MultipartUploadCompleteRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { + MultipartUploadCompleteRequest multipartUploadCompleteRequest = + getOmRequest().getCompleteMultiPartUploadRequest(); + + KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs(); + + return getOmRequest().toBuilder() + .setCompleteMultiPartUploadRequest(multipartUploadCompleteRequest + .toBuilder().setKeyArgs(keyArgs.toBuilder() + .setModificationTime(Time.now()))) + .setUserInfo(getUserInfo()).build(); + + } + + @Override + @SuppressWarnings("methodlength") + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long transactionLogIndex, + OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) { + MultipartUploadCompleteRequest multipartUploadCompleteRequest = + getOmRequest().getCompleteMultiPartUploadRequest(); + + KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs(); + + List partsList = + multipartUploadCompleteRequest.getPartsListList(); + + String volumeName = keyArgs.getVolumeName(); + String bucketName = keyArgs.getBucketName(); + String keyName = keyArgs.getKeyName(); + String uploadID = keyArgs.getMultipartUploadID(); + + ozoneManager.getMetrics().incNumCompleteMultipartUploads(); + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + + boolean acquiredLock = false; + OMResponse.Builder omResponse = OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload) + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .setSuccess(true); + OMClientResponse omClientResponse = null; + IOException exception = null; + OmMultipartUploadList multipartUploadList = null; + try { + // check Acl + if (ozoneManager.getAclsEnabled()) { + checkAcls(ozoneManager, OzoneObj.ResourceType.KEY, + OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE, + volumeName, bucketName, keyName); + } + + TreeMap partsMap = new TreeMap<>(); + for (OzoneManagerProtocolProtos.Part part : partsList) { + partsMap.put(part.getPartNumber(), part.getPartName()); + } + + multipartUploadList = new OmMultipartUploadList(partsMap); + + acquiredLock = omMetadataManager.getLock().acquireLock(BUCKET_LOCK, + volumeName, bucketName); + + validateBucketAndVolume(omMetadataManager, volumeName, bucketName); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, uploadID); + String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, + keyName); + OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey); + + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + + if (multipartKeyInfo == null) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); + } + TreeMap partKeyInfoMap = + multipartKeyInfo.getPartKeyInfoMap(); + + TreeMap multipartMap = multipartUploadList + .getMultipartMap(); + + // Last key in the map should be having key value as size, as map's + // are sorted. Last entry in both maps should have partNumber as size + // of the map. As we have part entries 1, 2, 3, 4 and then we get + // complete multipart upload request so the map last entry should have 4, + // if it is having value greater or less than map size, then there is + // some thing wrong throw error. + + Map.Entry multipartMapLastEntry = multipartMap + .lastEntry(); + Map.Entry partKeyInfoLastEntry = + partKeyInfoMap.lastEntry(); + if (partKeyInfoMap.size() != multipartMap.size()) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + OMException.ResultCodes.MISMATCH_MULTIPART_LIST); + } + + // Last entry part Number should be the size of the map, otherwise this + // means we have missing some parts but we got a complete request. + if (multipartMapLastEntry.getKey() != partKeyInfoMap.size() || + partKeyInfoLastEntry.getKey() != partKeyInfoMap.size()) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + OMException.ResultCodes.MISSING_UPLOAD_PARTS); + } + HddsProtos.ReplicationType type = partKeyInfoLastEntry.getValue() + .getPartKeyInfo().getType(); + HddsProtos.ReplicationFactor factor = partKeyInfoLastEntry.getValue() + .getPartKeyInfo().getFactor(); + List< OmKeyLocationInfo > locations = new ArrayList<>(); + long size = 0; + int partsCount =1; + int partsMapSize = partKeyInfoMap.size(); + for(Map.Entry partKeyInfoEntry : partKeyInfoMap + .entrySet()) { + int partNumber = partKeyInfoEntry.getKey(); + PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue(); + // Check we have all parts to complete multipart upload and also + // check partNames provided match with actual part names + String providedPartName = multipartMap.get(partNumber); + String actualPartName = partKeyInfo.getPartName(); + if (partNumber == partsCount) { + if (!actualPartName.equals(providedPartName)) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + OMException.ResultCodes.MISMATCH_MULTIPART_LIST); + } + OmKeyInfo currentPartKeyInfo = OmKeyInfo + .getFromProtobuf(partKeyInfo.getPartKeyInfo()); + // Check if any part size is less than 5mb, last part can be less + // than 5 mb. + if (partsCount != partsMapSize && + currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) { + LOG.error("MultipartUpload: " + ozoneKey + "Part number: " + + partKeyInfo.getPartNumber() + "size " + currentPartKeyInfo + .getDataSize() + " is less than minimum part size " + + OzoneConsts.OM_MULTIPART_MIN_SIZE); + throw new OMException("Complete Multipart Upload Failed: Entity " + + "too small: volume: " + volumeName + "bucket: " + bucketName + + "key: " + keyName, OMException.ResultCodes.ENTITY_TOO_SMALL); + } + // As all part keys will have only one version. + OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo + .getKeyLocationVersions().get(0); + locations.addAll(currentKeyInfoGroup.getLocationList()); + size += currentPartKeyInfo.getDataSize(); + } else { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + OMException.ResultCodes.MISSING_UPLOAD_PARTS); + } + partsCount++; + } + if (omKeyInfo == null) { + // This is a newly added key, it does not have any versions. + OmKeyLocationInfoGroup keyLocationInfoGroup = new + OmKeyLocationInfoGroup(0, locations); + // A newly created key, this is the first version. + omKeyInfo = new OmKeyInfo.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setKeyName(keyName) + .setReplicationFactor(factor).setReplicationType(type) + .setCreationTime(keyArgs.getModificationTime()) + .setModificationTime(keyArgs.getModificationTime()) + .setDataSize(size) + .setOmKeyLocationInfos( + Collections.singletonList(keyLocationInfoGroup)) + .setAcls(keyArgs.getAclsList()).build(); + } else { + // Already a version exists, so we should add it as a new version. + // But now as versioning is not supported, just following the commit + // key approach. When versioning support comes, then we can uncomment + // below code keyInfo.addNewVersion(locations); + omKeyInfo.updateLocationInfoList(locations); + omKeyInfo.setModificationTime(keyArgs.getModificationTime()); + } + + updateCache(omMetadataManager, ozoneKey, multipartKey, omKeyInfo, + transactionLogIndex); + + omResponse.setCompleteMultiPartUploadResponse( + MultipartUploadCompleteResponse.newBuilder() + .setVolume(volumeName) + .setBucket(bucketName) + .setKey(keyName) + .setHash(DigestUtils.sha256Hex(keyName))); + + omClientResponse = new S3MultipartUploadCompleteResponse(multipartKey, + omKeyInfo, omResponse.build()); + + } catch (IOException ex) { + exception = ex; + omClientResponse = new S3MultipartUploadCompleteResponse(null, null, + createErrorOMResponse(omResponse, exception)); + } finally { + if (omClientResponse != null) { + omClientResponse.setFlushFuture( + ozoneManagerDoubleBufferHelper.add(omClientResponse, + transactionLogIndex)); + } + if (acquiredLock) { + omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, + bucketName); + } + } + + Map auditMap = buildKeyArgsAuditMap(keyArgs); + if (multipartUploadList != null) { + auditMap.put(OzoneConsts.MULTIPART_LIST, multipartUploadList + .getMultipartMap().toString()); + } + + // audit log + auditLog(ozoneManager.getAuditLogger(), buildAuditMessage( + OMAction.COMPLETE_MULTIPART_UPLOAD, auditMap, exception, + getOmRequest().getUserInfo())); + + if (exception == null) { + LOG.debug("MultipartUpload Complete request is successfull for Key: {} " + + "in Volume/Bucket {}/{}", keyName, volumeName, bucketName); + } else { + LOG.error("MultipartUpload Complete request failed for Key: {} " + + "in Volume/Bucket {}/{}", keyName, volumeName, bucketName, exception); + ozoneManager.getMetrics().incNumCompleteMultipartUploadFails(); + } + + return omClientResponse; + } + + private void updateCache(OMMetadataManager omMetadataManager, + String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo, + long transactionLogIndex) { + // Update cache. + // 1. Add key entry to key table. + // 2. Delete multipartKey entry from openKeyTable and multipartInfo table. + omMetadataManager.getKeyTable().addCacheEntry( + new CacheKey<>(ozoneKey), + new CacheValue<>(Optional.of(omKeyInfo), transactionLogIndex)); + + omMetadataManager.getOpenKeyTable().addCacheEntry( + new CacheKey<>(multipartKey), + new CacheValue<>(Optional.absent(), transactionLogIndex)); + omMetadataManager.getMultipartInfoTable().addCacheEntry( + new CacheKey<>(multipartKey), + new CacheValue<>(Optional.absent(), transactionLogIndex)); + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java index 2d76a4081e..9091e7f236 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java @@ -61,7 +61,6 @@ public S3MultipartUploadCommitPartResponse(String multipartKey, this.oldMultipartKeyInfo = oldPartKeyInfo; } - @Override public void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { @@ -105,5 +104,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager, } } + + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java new file mode 100644 index 0000000000..e0b309fcd0 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java @@ -0,0 +1,46 @@ +package org.apache.hadoop.ozone.om.response.s3.multipart; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.utils.db.BatchOperation; + +import javax.annotation.Nullable; + +/** + * Response for Multipart Upload Complete request. + */ +public class S3MultipartUploadCompleteResponse extends OMClientResponse { + private String multipartKey; + private OmKeyInfo omKeyInfo; + + + public S3MultipartUploadCompleteResponse(@Nullable String multipartKey, + @Nullable OmKeyInfo omKeyInfo, OMResponse omResponse) { + super(omResponse); + this.multipartKey = multipartKey; + this.omKeyInfo = omKeyInfo; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + + if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) { + omMetadataManager.getKeyTable().putWithBatch(batchOperation, + omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(), + omKeyInfo.getBucketName(), omKeyInfo.getKeyName()), omKeyInfo); + omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation, + multipartKey); + omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation, + multipartKey); + } + } +} + + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java index cceff5684a..f3f27a65a0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java @@ -70,6 +70,7 @@ public OMResponse handleApplyTransaction(OMRequest omRequest, case InitiateMultiPartUpload: case CommitMultiPartUpload: case AbortMultiPartUpload: + case CompleteMultiPartUpload: //TODO: We don't need to pass transactionID, this will be removed when // complete write requests is changed to new model. And also we can // return OMClientResponse, then adding to doubleBuffer can be taken diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java index 162d4d1342..581c0836ca 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java @@ -39,6 +39,8 @@ .MultipartUploadAbortRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartCommitUploadPartRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -366,4 +368,24 @@ public static OMRequest createAbortMPURequest(String volumeName, .setAbortMultiPartUploadRequest(multipartUploadAbortRequest).build(); } + public static OMRequest createCompleteMPURequest(String volumeName, + String bucketName, String keyName, String multipartUploadID, + List partList) { + KeyArgs.Builder keyArgs = + KeyArgs.newBuilder().setVolumeName(volumeName) + .setKeyName(keyName) + .setBucketName(bucketName) + .setMultipartUploadID(multipartUploadID); + + MultipartUploadCompleteRequest multipartUploadCompleteRequest = + MultipartUploadCompleteRequest.newBuilder().setKeyArgs(keyArgs) + .addAllPartsList(partList).build(); + + return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString()) + .setCmdType(OzoneManagerProtocolProtos.Type.CompleteMultiPartUpload) + .setCompleteMultiPartUploadRequest(multipartUploadCompleteRequest) + .build(); + + } + } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java index 69335cf74f..9950027462 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; import java.io.IOException; +import java.util.List; import org.junit.After; import org.junit.Assert; @@ -38,6 +39,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; @@ -181,5 +183,26 @@ protected OMRequest doPreExecuteAbortMPU( } + protected OMRequest doPreExecuteCompleteMPU(String volumeName, + String bucketName, String keyName, String multipartUploadID, + List partList) throws IOException { + + OMRequest omRequest = + TestOMRequestUtils.createCompleteMPURequest(volumeName, bucketName, + keyName, multipartUploadID, partList); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + new S3MultipartUploadCompleteRequest(omRequest); + + OMRequest modifiedRequest = + s3MultipartUploadCompleteRequest.preExecute(ozoneManager); + + // UserInfo and modification time is set. + Assert.assertNotEquals(omRequest, modifiedRequest); + + return modifiedRequest; + + } + } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java new file mode 100644 index 0000000000..0f6978979b --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java @@ -0,0 +1,177 @@ +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part; +import org.apache.hadoop.util.Time; + + +/** + * Tests S3 Multipart Upload Complete request. + */ +public class TestS3MultipartUploadCompleteRequest + extends TestS3MultipartRequest { + + @Test + public void testPreExecute() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + doPreExecuteCompleteMPU(volumeName, bucketName, keyName, + UUID.randomUUID().toString(), new ArrayList<>()); + } + + @Test + public void testValidateAndUpdateCacheSuccess() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + new S3InitiateMultipartUploadRequest(initiateMPURequest); + + OMClientResponse omClientResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L, ozoneManagerDoubleBufferHelper); + + long clientID = Time.now(); + String multipartUploadID = omClientResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + new S3MultipartUploadCommitPartRequest(commitMultipartRequest); + + // Add key to open key table. + TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, + keyName, clientID, HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, omMetadataManager); + + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L, ozoneManagerDoubleBufferHelper); + + List partList = new ArrayList<>(); + + partList.add(Part.newBuilder().setPartName( + omMetadataManager.getOzoneKey(volumeName, bucketName, keyName) + + clientID).setPartNumber(1).build()); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, multipartUploadID, partList); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + new S3MultipartUploadCompleteRequest(completeMultipartRequest); + + omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, + 3L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + + Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey)); + Assert.assertNull( + omMetadataManager.getMultipartInfoTable().get(multipartKey)); + Assert.assertNotNull(omMetadataManager.getKeyTable().get( + omMetadataManager.getOzoneKey(volumeName, bucketName, keyName))); + } + + @Test + public void testValidateAndUpdateCacheVolumeNotFound() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + List partList = new ArrayList<>(); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, UUID.randomUUID().toString(), partList); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + new S3MultipartUploadCompleteRequest(completeMultipartRequest); + + OMClientResponse omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, + 3L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND, + omClientResponse.getOMResponse().getStatus()); + + } + + @Test + public void testValidateAndUpdateCacheBucketNotFound() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager); + List partList = new ArrayList<>(); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, UUID.randomUUID().toString(), partList); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + new S3MultipartUploadCompleteRequest(completeMultipartRequest); + + OMClientResponse omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, + 3L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND, + omClientResponse.getOMResponse().getStatus()); + + } + + @Test + public void testValidateAndUpdateCacheNoSuchMultipartUploadError() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + List partList = new ArrayList<>(); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, UUID.randomUUID().toString(), partList); + + // Doing complete multipart upload request with out initiate. + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + new S3MultipartUploadCompleteRequest(completeMultipartRequest); + + OMClientResponse omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, + 3L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals( + OzoneManagerProtocolProtos.Status.NO_SUCH_MULTIPART_UPLOAD_ERROR, + omClientResponse.getOMResponse().getStatus()); + + } +} +