HDDS-924. MultipartUpload: S3 APi for complete Multipart Upload. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
695e93c2bb
commit
999f31fd20
@ -65,6 +65,87 @@ Test Multipart Upload
|
|||||||
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
|
||||||
Should contain ${result} ETag
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
|
||||||
|
Test Multipart Upload Complete
|
||||||
|
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey1 --storage-class REDUCED_REDUNDANCY
|
||||||
|
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
|
||||||
|
Should contain ${result} ${BUCKET}
|
||||||
|
Should contain ${result} multipartKey
|
||||||
|
Should contain ${result} UploadId
|
||||||
|
|
||||||
|
#upload parts
|
||||||
|
${system} = Evaluate platform.system() platform
|
||||||
|
Run Keyword if '${system}' == 'Darwin' Create Random file for mac
|
||||||
|
Run Keyword if '${system}' == 'Linux' Create Random file for linux
|
||||||
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey1 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
|
||||||
|
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
Execute echo "Part2" > /tmp/part2
|
||||||
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey1 --part-number 2 --body /tmp/part2 --upload-id ${uploadID}
|
||||||
|
${eTag2} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
#complete multipart upload
|
||||||
|
${result} = Execute AWSS3APICli complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key multipartKey1 --multipart-upload 'Parts=[{ETag=${eTag1},PartNumber=1},{ETag=${eTag2},PartNumber=2}]'
|
||||||
|
Should contain ${result} ${BUCKET}
|
||||||
|
Should contain ${result} multipartKey1
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
#read file and check the key
|
||||||
|
${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key multipartKey1 /tmp/multipartKey1.result
|
||||||
|
Execute cat /tmp/part1 /tmp/part2 >> /tmp/multipartkey1
|
||||||
|
${checksumbefore} = Execute md5sum /tmp/multipartkey1 | awk '{print $1}'
|
||||||
|
${checksumafter} = Execute md5sum /tmp/multipartKey1.result | awk '{print $1}'
|
||||||
|
Should Be Equal ${checksumbefore} ${checksumafter}
|
||||||
|
|
||||||
|
|
||||||
|
Test Multipart Upload Complete Entity too small
|
||||||
|
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey2 --storage-class REDUCED_REDUNDANCY
|
||||||
|
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
|
||||||
|
Should contain ${result} ${BUCKET}
|
||||||
|
Should contain ${result} multipartKey
|
||||||
|
Should contain ${result} UploadId
|
||||||
|
|
||||||
|
#upload parts
|
||||||
|
Execute echo "Part1" > /tmp/part1
|
||||||
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey2 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
|
||||||
|
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
Execute echo "Part2" > /tmp/part2
|
||||||
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey2 --part-number 2 --body /tmp/part2 --upload-id ${uploadID}
|
||||||
|
${eTag2} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
#complete multipart upload
|
||||||
|
${result} = Execute AWSS3APICli and checkrc complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key multipartKey2 --multipart-upload 'Parts=[{ETag=${eTag1},PartNumber=1},{ETag=${eTag2},PartNumber=2}]' 255
|
||||||
|
Should contain ${result} EntityTooSmall
|
||||||
|
|
||||||
|
|
||||||
|
Test Multipart Upload Complete Invalid part
|
||||||
|
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey3 --storage-class REDUCED_REDUNDANCY
|
||||||
|
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
|
||||||
|
Should contain ${result} ${BUCKET}
|
||||||
|
Should contain ${result} multipartKey
|
||||||
|
Should contain ${result} UploadId
|
||||||
|
|
||||||
|
#upload parts
|
||||||
|
Execute echo "Part1" > /tmp/part1
|
||||||
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey3 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
|
||||||
|
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
Execute echo "Part2" > /tmp/part2
|
||||||
|
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey3 --part-number 2 --body /tmp/part2 --upload-id ${uploadID}
|
||||||
|
${eTag2} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
|
||||||
|
Should contain ${result} ETag
|
||||||
|
|
||||||
|
#complete multipart upload
|
||||||
|
${result} = Execute AWSS3APICli and checkrc complete-multipart-upload --upload-id ${uploadID} --bucket ${BUCKET} --key multipartKey3 --multipart-upload 'Parts=[{ETag=etag1,PartNumber=1},{ETag=etag2,PartNumber=2}]' 255
|
||||||
|
Should contain ${result} InvalidPart
|
||||||
|
|
||||||
|
|
||||||
Upload part with Incorrect uploadID
|
Upload part with Incorrect uploadID
|
||||||
Execute echo "Multipart upload" > /tmp/testfile
|
Execute echo "Multipart upload" > /tmp/testfile
|
||||||
${result} = Execute AWSS3APICli and checkrc upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/testfile --upload-id "random" 255
|
${result} = Execute AWSS3APICli and checkrc upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/testfile --upload-id "random" 255
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||||
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
||||||
@ -832,6 +833,10 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload(
|
|||||||
// than 5 mb.
|
// than 5 mb.
|
||||||
if (partsCount != partsMapSize &&
|
if (partsCount != partsMapSize &&
|
||||||
currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) {
|
currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) {
|
||||||
|
LOG.error("MultipartUpload: " + ozoneKey + "Part number: " +
|
||||||
|
partKeyInfo.getPartNumber() + "size " + currentPartKeyInfo
|
||||||
|
.getDataSize() + " is less than minimum part size " +
|
||||||
|
OzoneConsts.OM_MULTIPART_MIN_SIZE);
|
||||||
throw new OMException("Complete Multipart Upload Failed: Entity " +
|
throw new OMException("Complete Multipart Upload Failed: Entity " +
|
||||||
"too small: volume: " + volumeName + "bucket: " + bucketName
|
"too small: volume: " + volumeName + "bucket: " + bucketName
|
||||||
+ "key: " + keyName, ResultCodes.ENTITY_TOO_SMALL);
|
+ "key: " + keyName, ResultCodes.ENTITY_TOO_SMALL);
|
||||||
|
@ -42,6 +42,14 @@ public void filter(ContainerRequestContext requestContext) throws
|
|||||||
requestContext.getHeaders()
|
requestContext.getHeaders()
|
||||||
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (requestContext.getUriInfo().getQueryParameters()
|
||||||
|
.containsKey("uploadId")) {
|
||||||
|
//aws cli doesn't send proper Content-Type and by default POST requests
|
||||||
|
//processed as form-url-encoded. Here we can fix this.
|
||||||
|
requestContext.getHeaders()
|
||||||
|
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,77 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.s3.endpoint;
|
||||||
|
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request for Complete Multipart Upload request.
|
||||||
|
*/
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
@XmlRootElement(name = "CompleteMultipartUpload")
|
||||||
|
public class CompleteMultipartUploadRequest {
|
||||||
|
|
||||||
|
@XmlElement(name = "Part")
|
||||||
|
private List<Part> partList = new ArrayList<>();
|
||||||
|
|
||||||
|
public List<Part> getPartList() {
|
||||||
|
return partList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPartList(List<Part> partList) {
|
||||||
|
this.partList = partList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JAXB entity for child element.
|
||||||
|
*/
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
@XmlRootElement(name = "Part")
|
||||||
|
public static class Part {
|
||||||
|
|
||||||
|
@XmlElement(name = "PartNumber")
|
||||||
|
private int partNumber;
|
||||||
|
|
||||||
|
@XmlElement(name = "ETag")
|
||||||
|
private String eTag;
|
||||||
|
|
||||||
|
public int getPartNumber() {
|
||||||
|
return partNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPartNumber(int partNumber) {
|
||||||
|
this.partNumber = partNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String geteTag() {
|
||||||
|
return eTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void seteTag(String eTag) {
|
||||||
|
this.eTag = eTag;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,78 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.s3.endpoint;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Complete Multipart Upload request response.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
@XmlRootElement(name = "CompleteMultipartUploadResult", namespace =
|
||||||
|
"http://s3.amazonaws.com/doc/2006-03-01/")
|
||||||
|
public class CompleteMultipartUploadResponse {
|
||||||
|
|
||||||
|
@XmlElement(name = "Location")
|
||||||
|
private String location;
|
||||||
|
|
||||||
|
@XmlElement(name = "Bucket")
|
||||||
|
private String bucket;
|
||||||
|
|
||||||
|
@XmlElement(name = "Key")
|
||||||
|
private String key;
|
||||||
|
|
||||||
|
@XmlElement(name = "ETag")
|
||||||
|
private String eTag;
|
||||||
|
|
||||||
|
public String getLocation() {
|
||||||
|
return location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLocation(String location) {
|
||||||
|
this.location = location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBucket() {
|
||||||
|
return bucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBucket(String bucket) {
|
||||||
|
this.bucket = bucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKey(String key) {
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getETag() {
|
||||||
|
return eTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setETag(String tag) {
|
||||||
|
this.eTag = tag;
|
||||||
|
}
|
||||||
|
}
|
@ -42,6 +42,8 @@
|
|||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
@ -51,6 +53,7 @@
|
|||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
|
||||||
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
|
||||||
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
|
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
|
||||||
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
||||||
@ -67,6 +70,7 @@
|
|||||||
import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED;
|
import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
|
||||||
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
|
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
|
||||||
import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER;
|
import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER;
|
||||||
import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER;
|
import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER;
|
||||||
@ -360,10 +364,31 @@ public Response delete(
|
|||||||
|
|
||||||
@POST
|
@POST
|
||||||
@Produces(MediaType.APPLICATION_XML)
|
@Produces(MediaType.APPLICATION_XML)
|
||||||
public Response initiateMultipartUpload(
|
public Response multipartUpload(
|
||||||
@PathParam("bucket") String bucket,
|
@PathParam("bucket") String bucket,
|
||||||
@PathParam("path") String key,
|
@PathParam("path") String key,
|
||||||
@QueryParam("uploads") String uploads) throws IOException, OS3Exception {
|
@QueryParam("uploads") String uploads,
|
||||||
|
@QueryParam("uploadId") @DefaultValue("") String uploadID,
|
||||||
|
CompleteMultipartUploadRequest request) throws IOException, OS3Exception {
|
||||||
|
if (!uploadID.equals("")) {
|
||||||
|
//Complete Multipart upload request.
|
||||||
|
return completeMultipartUpload(bucket, key, uploadID, request);
|
||||||
|
} else {
|
||||||
|
// Initiate Multipart upload request.
|
||||||
|
return initiateMultipartUpload(bucket, key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initiate Multipart upload request.
|
||||||
|
* @param bucket
|
||||||
|
* @param key
|
||||||
|
* @return Response
|
||||||
|
* @throws IOException
|
||||||
|
* @throws OS3Exception
|
||||||
|
*/
|
||||||
|
private Response initiateMultipartUpload(String bucket, String key) throws
|
||||||
|
IOException, OS3Exception {
|
||||||
try {
|
try {
|
||||||
OzoneBucket ozoneBucket = getBucket(bucket);
|
OzoneBucket ozoneBucket = getBucket(bucket);
|
||||||
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
|
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
|
||||||
@ -396,11 +421,73 @@ public Response initiateMultipartUpload(
|
|||||||
|
|
||||||
return Response.status(Status.OK).entity(
|
return Response.status(Status.OK).entity(
|
||||||
multipartUploadInitiateResponse).build();
|
multipartUploadInitiateResponse).build();
|
||||||
|
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Error in Initiate Multipart Upload Request for bucket: " +
|
||||||
|
bucket + ", key: " + key, ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Complete Multipart upload request.
|
||||||
|
* @param bucket
|
||||||
|
* @param key
|
||||||
|
* @param uploadID
|
||||||
|
* @param multipartUploadRequest
|
||||||
|
* @return Response
|
||||||
|
* @throws IOException
|
||||||
|
* @throws OS3Exception
|
||||||
|
*/
|
||||||
|
private Response completeMultipartUpload(String bucket, String key, String
|
||||||
|
uploadID, CompleteMultipartUploadRequest multipartUploadRequest) throws
|
||||||
|
IOException, OS3Exception {
|
||||||
|
OzoneBucket ozoneBucket = getBucket(bucket);
|
||||||
|
Map<Integer, String> partsMap = new TreeMap<>();
|
||||||
|
List<CompleteMultipartUploadRequest.Part> partList =
|
||||||
|
multipartUploadRequest.getPartList();
|
||||||
|
|
||||||
|
for (CompleteMultipartUploadRequest.Part part : partList) {
|
||||||
|
partsMap.put(part.getPartNumber(), part.geteTag());
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("Parts map {}", partsMap.toString());
|
||||||
|
|
||||||
|
OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo;
|
||||||
|
try {
|
||||||
|
omMultipartUploadCompleteInfo = ozoneBucket.completeMultipartUpload(
|
||||||
|
key, uploadID, partsMap);
|
||||||
|
CompleteMultipartUploadResponse completeMultipartUploadResponse =
|
||||||
|
new CompleteMultipartUploadResponse();
|
||||||
|
completeMultipartUploadResponse.setBucket(bucket);
|
||||||
|
completeMultipartUploadResponse.setKey(key);
|
||||||
|
completeMultipartUploadResponse.setETag(omMultipartUploadCompleteInfo
|
||||||
|
.getHash());
|
||||||
|
// Location also setting as bucket name.
|
||||||
|
completeMultipartUploadResponse.setLocation(bucket);
|
||||||
|
return Response.status(Status.OK).entity(completeMultipartUploadResponse)
|
||||||
|
.build();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Error in Complete Multipart Upload Request for bucket: " +
|
||||||
|
bucket + ", key: " + key, ex);
|
||||||
|
if (ex.getMessage().contains("MISMATCH_MULTIPART_LIST")) {
|
||||||
|
OS3Exception oex =
|
||||||
|
S3ErrorTable.newError(S3ErrorTable.INVALID_PART, key);
|
||||||
|
throw oex;
|
||||||
|
} else if (ex.getMessage().contains("MISSING_UPLOAD_PARTS")) {
|
||||||
|
OS3Exception oex =
|
||||||
|
S3ErrorTable.newError(S3ErrorTable.INVALID_PART_ORDER, key);
|
||||||
|
throw oex;
|
||||||
|
} else if (ex.getMessage().contains("NO_SUCH_MULTIPART_UPLOAD_ERROR")) {
|
||||||
|
OS3Exception os3Exception = S3ErrorTable.newError(NO_SUCH_UPLOAD,
|
||||||
|
uploadID);
|
||||||
|
throw os3Exception;
|
||||||
|
} else if (ex.getMessage().contains("ENTITY_TOO_SMALL")) {
|
||||||
|
OS3Exception os3Exception = S3ErrorTable.newError(ENTITY_TOO_SMALL,
|
||||||
|
key);
|
||||||
|
throw os3Exception;
|
||||||
|
}
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Response createMultipartKey(String bucket, String key, long length,
|
private Response createMultipartKey(String bucket, String key, long length,
|
||||||
|
@ -68,6 +68,22 @@ private S3ErrorTable() {
|
|||||||
"upload ID might be invalid, or the multipart upload might have " +
|
"upload ID might be invalid, or the multipart upload might have " +
|
||||||
"been aborted or completed.", HTTP_NOT_FOUND);
|
"been aborted or completed.", HTTP_NOT_FOUND);
|
||||||
|
|
||||||
|
public static final OS3Exception INVALID_PART = new OS3Exception(
|
||||||
|
"InvalidPart", "One or more of the specified parts could not be found." +
|
||||||
|
" The part might not have been uploaded, or the specified entity " +
|
||||||
|
"tag might not have matched the part's entity tag.", HTTP_BAD_REQUEST);
|
||||||
|
|
||||||
|
public static final OS3Exception INVALID_PART_ORDER = new OS3Exception(
|
||||||
|
"InvalidPartOrder", "The list of parts was not in ascending order. The " +
|
||||||
|
"parts list must be specified in order by part number.",
|
||||||
|
HTTP_BAD_REQUEST);
|
||||||
|
|
||||||
|
public static final OS3Exception ENTITY_TOO_SMALL = new OS3Exception(
|
||||||
|
"EntityTooSmall", "Your proposed upload is smaller than the minimum " +
|
||||||
|
"allowed object size. Each part must be at least 5 MB in size, except " +
|
||||||
|
"the last part.", HTTP_BAD_REQUEST);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance of Error.
|
* Create a new instance of Error.
|
||||||
* @param e Error Template
|
* @param e Error Template
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
|
||||||
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In-memory ozone bucket for testing.
|
* In-memory ozone bucket for testing.
|
||||||
@ -179,7 +181,7 @@ public void close() throws IOException {
|
|||||||
Part part = new Part(key + size,
|
Part part = new Part(key + size,
|
||||||
toByteArray());
|
toByteArray());
|
||||||
if (partList.get(key) == null) {
|
if (partList.get(key) == null) {
|
||||||
Map<Integer, Part> parts = new HashMap<>();
|
Map<Integer, Part> parts = new TreeMap<>();
|
||||||
parts.put(partNumber, part);
|
parts.put(partNumber, part);
|
||||||
partList.put(key, parts);
|
partList.put(key, parts);
|
||||||
} else {
|
} else {
|
||||||
@ -191,6 +193,36 @@ public void close() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OmMultipartUploadCompleteInfo completeMultipartUpload(String key,
|
||||||
|
String uploadID, Map<Integer, String> partsMap) throws IOException {
|
||||||
|
|
||||||
|
if (multipartUploadIdMap.get(key) == null) {
|
||||||
|
throw new IOException("NO_SUCH_MULTIPART_UPLOAD_ERROR");
|
||||||
|
} else {
|
||||||
|
final Map<Integer, Part> partsList = partList.get(key);
|
||||||
|
|
||||||
|
if (partsMap.size() != partsList.size()) {
|
||||||
|
throw new IOException("MISMATCH_MULTIPART_LIST");
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = 1;
|
||||||
|
for (Map.Entry<Integer, String> part: partsMap.entrySet()) {
|
||||||
|
if (part.getKey() != count) {
|
||||||
|
throw new IOException("MISSING_UPLOAD_PARTS");
|
||||||
|
} else if (!part.getValue().equals(
|
||||||
|
partsList.get(part.getKey()).getPartName())) {
|
||||||
|
throw new IOException("MISMATCH_MULTIPART_LIST");
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new OmMultipartUploadCompleteInfo(getVolumeName(), getName(), key,
|
||||||
|
DigestUtils.sha256Hex(key));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class used to hold part information in a upload part request.
|
* Class used to hold part information in a upload part request.
|
||||||
*/
|
*/
|
||||||
|
@ -60,7 +60,7 @@ public void testInitiateMultipartUpload() throws Exception {
|
|||||||
rest.setHeaders(headers);
|
rest.setHeaders(headers);
|
||||||
rest.setClient(client);
|
rest.setClient(client);
|
||||||
|
|
||||||
Response response = rest.initiateMultipartUpload(bucket, key, "");
|
Response response = rest.multipartUpload(bucket, key, "", "", null);
|
||||||
|
|
||||||
assertEquals(response.getStatus(), 200);
|
assertEquals(response.getStatus(), 200);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
@ -69,7 +69,7 @@ public void testInitiateMultipartUpload() throws Exception {
|
|||||||
String uploadID = multipartUploadInitiateResponse.getUploadID();
|
String uploadID = multipartUploadInitiateResponse.getUploadID();
|
||||||
|
|
||||||
// Calling again should return different uploadID.
|
// Calling again should return different uploadID.
|
||||||
response = rest.initiateMultipartUpload(bucket, key, "");
|
response = rest.multipartUpload(bucket, key, "", "", null);
|
||||||
assertEquals(response.getStatus(), 200);
|
assertEquals(response.getStatus(), 200);
|
||||||
multipartUploadInitiateResponse =
|
multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
|
@ -0,0 +1,222 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.s3.endpoint;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.client.OzoneClientStub;
|
||||||
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
|
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import javax.ws.rs.core.HttpHeaders;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest.Part;
|
||||||
|
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to test Multipart upload end to end.
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class TestMultipartUploadComplete {
|
||||||
|
|
||||||
|
private final static ObjectEndpoint REST = new ObjectEndpoint();;
|
||||||
|
private final static String BUCKET = "s3bucket";
|
||||||
|
private final static String KEY = "key1";
|
||||||
|
private final static OzoneClientStub CLIENT = new OzoneClientStub();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
|
||||||
|
CLIENT.getObjectStore().createS3Bucket("ozone", BUCKET);
|
||||||
|
|
||||||
|
|
||||||
|
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
|
||||||
|
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
|
||||||
|
"STANDARD");
|
||||||
|
|
||||||
|
REST.setHeaders(headers);
|
||||||
|
REST.setClient(CLIENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String initiateMultipartUpload(String key) throws IOException,
|
||||||
|
OS3Exception {
|
||||||
|
Response response = REST.multipartUpload(BUCKET, key, "", "", null);
|
||||||
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
|
String uploadID = multipartUploadInitiateResponse.getUploadID();
|
||||||
|
|
||||||
|
assertEquals(response.getStatus(), 200);
|
||||||
|
|
||||||
|
return uploadID;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Part uploadPart(String key, String uploadID, int partNumber, String
|
||||||
|
content) throws IOException, OS3Exception {
|
||||||
|
ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes());
|
||||||
|
Response response = REST.put(BUCKET, key, content.length(), partNumber,
|
||||||
|
uploadID, body);
|
||||||
|
assertEquals(response.getStatus(), 200);
|
||||||
|
assertNotNull(response.getHeaderString("ETag"));
|
||||||
|
Part part = new Part();
|
||||||
|
part.seteTag(response.getHeaderString("ETag"));
|
||||||
|
part.setPartNumber(partNumber);
|
||||||
|
|
||||||
|
return part;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeMultipartUpload(String key,
|
||||||
|
CompleteMultipartUploadRequest completeMultipartUploadRequest,
|
||||||
|
String uploadID) throws IOException, OS3Exception {
|
||||||
|
Response response = REST.multipartUpload(BUCKET, key, "", uploadID,
|
||||||
|
completeMultipartUploadRequest);
|
||||||
|
|
||||||
|
assertEquals(response.getStatus(), 200);
|
||||||
|
|
||||||
|
CompleteMultipartUploadResponse completeMultipartUploadResponse =
|
||||||
|
(CompleteMultipartUploadResponse) response.getEntity();
|
||||||
|
|
||||||
|
assertEquals(completeMultipartUploadResponse.getBucket(), BUCKET);
|
||||||
|
assertEquals(completeMultipartUploadResponse.getKey(), KEY);
|
||||||
|
assertEquals(completeMultipartUploadResponse.getLocation(), BUCKET);
|
||||||
|
assertNotNull(completeMultipartUploadResponse.getETag());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipart() throws Exception {
|
||||||
|
|
||||||
|
// Initiate multipart upload
|
||||||
|
String uploadID = initiateMultipartUpload(KEY);
|
||||||
|
|
||||||
|
List<Part> partsList = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
// Upload parts
|
||||||
|
String content = "Multipart Upload 1";
|
||||||
|
int partNumber = 1;
|
||||||
|
|
||||||
|
Part part1 = uploadPart(KEY, uploadID, partNumber, content);
|
||||||
|
partsList.add(part1);
|
||||||
|
|
||||||
|
content = "Multipart Upload 2";
|
||||||
|
partNumber = 2;
|
||||||
|
Part part2 = uploadPart(KEY, uploadID, partNumber, content);
|
||||||
|
partsList.add(part2);
|
||||||
|
|
||||||
|
// complete multipart upload
|
||||||
|
CompleteMultipartUploadRequest completeMultipartUploadRequest = new
|
||||||
|
CompleteMultipartUploadRequest();
|
||||||
|
completeMultipartUploadRequest.setPartList(partsList);
|
||||||
|
|
||||||
|
|
||||||
|
completeMultipartUpload(KEY, completeMultipartUploadRequest,
|
||||||
|
uploadID);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipartInvalidPartOrderError() throws Exception {
|
||||||
|
|
||||||
|
// Initiate multipart upload
|
||||||
|
String key = UUID.randomUUID().toString();
|
||||||
|
String uploadID = initiateMultipartUpload(key);
|
||||||
|
|
||||||
|
List<Part> partsList = new ArrayList<>();
|
||||||
|
|
||||||
|
// Upload parts
|
||||||
|
String content = "Multipart Upload 1";
|
||||||
|
int partNumber = 1;
|
||||||
|
|
||||||
|
Part part1 = uploadPart(key, uploadID, partNumber, content);
|
||||||
|
// Change part number
|
||||||
|
part1.setPartNumber(3);
|
||||||
|
partsList.add(part1);
|
||||||
|
|
||||||
|
content = "Multipart Upload 2";
|
||||||
|
partNumber = 2;
|
||||||
|
|
||||||
|
Part part2 = uploadPart(key, uploadID, partNumber, content);
|
||||||
|
partsList.add(part2);
|
||||||
|
|
||||||
|
// complete multipart upload
|
||||||
|
CompleteMultipartUploadRequest completeMultipartUploadRequest = new
|
||||||
|
CompleteMultipartUploadRequest();
|
||||||
|
completeMultipartUploadRequest.setPartList(partsList);
|
||||||
|
try {
|
||||||
|
completeMultipartUpload(key, completeMultipartUploadRequest, uploadID);
|
||||||
|
fail("testMultipartInvalidPartOrderError");
|
||||||
|
} catch (OS3Exception ex) {
|
||||||
|
assertEquals(ex.getCode(), S3ErrorTable.INVALID_PART_ORDER.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipartInvalidPartError() throws Exception {
|
||||||
|
|
||||||
|
// Initiate multipart upload
|
||||||
|
String key = UUID.randomUUID().toString();
|
||||||
|
String uploadID = initiateMultipartUpload(key);
|
||||||
|
|
||||||
|
List<Part> partsList = new ArrayList<>();
|
||||||
|
|
||||||
|
// Upload parts
|
||||||
|
String content = "Multipart Upload 1";
|
||||||
|
int partNumber = 1;
|
||||||
|
|
||||||
|
Part part1 = uploadPart(key, uploadID, partNumber, content);
|
||||||
|
// Change part number
|
||||||
|
part1.seteTag("random");
|
||||||
|
partsList.add(part1);
|
||||||
|
|
||||||
|
content = "Multipart Upload 2";
|
||||||
|
partNumber = 2;
|
||||||
|
|
||||||
|
Part part2 = uploadPart(key, uploadID, partNumber, content);
|
||||||
|
partsList.add(part2);
|
||||||
|
|
||||||
|
// complete multipart upload
|
||||||
|
CompleteMultipartUploadRequest completeMultipartUploadRequest = new
|
||||||
|
CompleteMultipartUploadRequest();
|
||||||
|
completeMultipartUploadRequest.setPartList(partsList);
|
||||||
|
try {
|
||||||
|
completeMultipartUpload(key, completeMultipartUploadRequest, uploadID);
|
||||||
|
fail("testMultipartInvalidPartOrderError");
|
||||||
|
} catch (OS3Exception ex) {
|
||||||
|
assertEquals(ex.getCode(), S3ErrorTable.INVALID_PART.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -67,7 +67,7 @@ public static void setUp() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testPartUpload() throws Exception {
|
public void testPartUpload() throws Exception {
|
||||||
|
|
||||||
Response response = REST.initiateMultipartUpload(BUCKET, KEY, "");
|
Response response = REST.multipartUpload(BUCKET, KEY, "", "", null);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
@ -86,7 +86,7 @@ public void testPartUpload() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testPartUploadWithOverride() throws Exception {
|
public void testPartUploadWithOverride() throws Exception {
|
||||||
|
|
||||||
Response response = REST.initiateMultipartUpload(BUCKET, KEY, "");
|
Response response = REST.multipartUpload(BUCKET, KEY, "", "", null);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
|
Loading…
Reference in New Issue
Block a user