HDDS-902. MultipartUpload: S3 API for uploading a part file. Contributed by Bharat Viswanadham.

This commit is contained in:
Márton Elek 2019-01-07 12:39:46 +01:00
parent 992dd9d189
commit d66925a315
7 changed files with 341 additions and 16 deletions

View File

@ -21,22 +21,51 @@ Resource ../commonlib.robot
Resource commonawslib.robot
Test Setup Setup s3 tests
*** Keywords ***
Create Random file for mac
Execute dd if=/dev/urandom of=/tmp/part1 bs=1m count=5
Create Random file for linux
Execute dd if=/dev/urandom of=/tmp/part1 bs=1M count=5
*** Variables ***
${ENDPOINT_URL} http://s3g:9878
${BUCKET} generated
*** Test Cases ***
Initiate Multipart Upload
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey
Test Multipart Upload
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey --storage-class REDUCED_REDUNDANCY
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
Should contain ${result} ${BUCKET}
Should contain ${result} multipartKey
Should contain ${result} UploadId
# initiate again
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey --storage-class REDUCED_REDUNDANCY
${nextUploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
Should contain ${result} ${BUCKET}
Should contain ${result} multipartKey
Should contain ${result} UploadId
Should Not Be Equal ${uploadID} ${nextUploadID}
# upload part
# each part should be minimum 5mb, other wise during complete multipart
# upload we get error entity too small. So, considering further complete
# multipart upload, uploading each part as 5MB file, exception is for last part
${system} = Evaluate platform.system() platform
Run Keyword if '${system}' == 'Darwin' Create Random file for mac
Run Keyword if '${system}' == 'Linux' Create Random file for linux
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
Should contain ${result} ETag
# override part
Run Keyword if '${system}' == 'Darwin' Create Random file for mac
Run Keyword if '${system}' == 'Linux' Create Random file for linux
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID}
Should contain ${result} ETag
Upload part with Incorrect uploadID
Execute echo "Multipart upload" > /tmp/testfile
${result} = Execute AWSS3APICli and checkrc upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/testfile --upload-id "random" 255
Should contain ${result} NoSuchUpload

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.s3.endpoint;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.HeaderParam;
@ -48,6 +49,7 @@
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
@ -64,6 +66,8 @@
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH;
import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED;
import org.apache.commons.io.IOUtils;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
@ -108,9 +112,18 @@ public Response put(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
InputStream body) throws IOException, OS3Exception {
OzoneOutputStream output = null;
if (!uploadID.equals("")) {
// If uploadID is specified, it is a request for upload part
return createMultipartKey(bucketName, keyPath, length,
partNumber, uploadID, body);
}
try {
String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
@ -390,6 +403,32 @@ public Response initiateMultipartUpload(
}
private Response createMultipartKey(String bucket, String key, long length,
int partNumber, String uploadID,
InputStream body)
throws IOException, OS3Exception {
try {
OzoneBucket ozoneBucket = getBucket(bucket);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
key, length, partNumber, uploadID);
IOUtils.copy(body, ozoneOutputStream);
ozoneOutputStream.close();
OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
ozoneOutputStream.getCommitUploadPartInfo();
return Response.status(Status.OK).header("ETag",
omMultipartCommitUploadPartInfo.getPartName()).build();
} catch (IOException ex) {
if (ex.getMessage().contains("NO_SUCH_MULTIPART_UPLOAD_ERROR")) {
OS3Exception os3Exception = S3ErrorTable.newError(NO_SUCH_UPLOAD,
uploadID);
throw os3Exception;
}
throw ex;
}
}
@VisibleForTesting
public void setHeaders(HttpHeaders headers) {
this.headers = headers;

View File

@ -63,6 +63,11 @@ private S3ErrorTable() {
"InvalidRange", "The requested range is not satisfiable",
RANGE_NOT_SATISFIABLE);
public static final OS3Exception NO_SUCH_UPLOAD = new OS3Exception(
"NoSuchUpload", "The specified multipart upload does not exist. The " +
"upload ID might be invalid, or the multipart upload might have " +
"been aborted or completed.", HTTP_NOT_FOUND);
/**
* Create a new instance of Error.
* @param e Error Template

View File

@ -49,6 +49,9 @@ public class OzoneBucketStub extends OzoneBucket {
private Map<String, byte[]> keyContents = new HashMap<>();
private Map<String, String> multipartUploadIdMap = new HashMap<>();
private Map<String, Map<Integer, Part>> partList = new HashMap<>();
/**
* Constructs OzoneBucket instance.
*
@ -160,4 +163,52 @@ public OmMultipartInfo initiateMultipartUpload(String keyName,
multipartUploadIdMap.put(keyName, uploadID);
return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID);
}
@Override
public OzoneOutputStream createMultipartKey(String key, long size,
int partNumber, String uploadID)
throws IOException {
String multipartUploadID = multipartUploadIdMap.get(key);
if (multipartUploadID == null || multipartUploadID != uploadID) {
throw new IOException("NO_SUCH_MULTIPART_UPLOAD_ERROR");
} else {
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream((int) size) {
@Override
public void close() throws IOException {
Part part = new Part(key + size,
toByteArray());
if (partList.get(key) == null) {
Map<Integer, Part> parts = new HashMap<>();
parts.put(partNumber, part);
partList.put(key, parts);
} else {
partList.get(key).put(partNumber, part);
}
}
};
return new OzoneOutputStreamStub(byteArrayOutputStream, key + size);
}
}
/**
* Class used to hold part information in a upload part request.
*/
public class Part {
private String partName;
private byte[] content;
public Part(String name, byte[] data) {
this.partName = name;
this.content = data;
}
public String getPartName() {
return partName;
}
public byte[] getContent() {
return content;
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.hadoop.ozone.client;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import java.io.IOException;
import java.io.OutputStream;
/**
* OzoneOutputStream stub for testing.
*/
public class OzoneOutputStreamStub extends OzoneOutputStream {
private final String partName;
/**
* Constructs OzoneOutputStreamStub with outputStream and partName.
*
* @param outputStream
* @param name - partName
*/
public OzoneOutputStreamStub(OutputStream outputStream, String name) {
super(outputStream);
this.partName = name;
}
@Override
public void write(int b) throws IOException {
getOutputStream().write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
getOutputStream().write(b, off, len);
}
@Override
public synchronized void flush() throws IOException {
getOutputStream().flush();
}
@Override
public synchronized void close() throws IOException {
//commitKey can be done here, if needed.
getOutputStream().close();
}
@Override
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
return new OmMultipartCommitUploadPartInfo(partName);
}
}

View File

@ -85,7 +85,7 @@ public void testPutObject() throws IOException, OS3Exception {
//WHEN
Response response = objectEndpoint.put(bucketName, keyName, CONTENT
.length(), body);
.length(), 1, null, body);
//THEN
@ -117,7 +117,7 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception {
//WHEN
Response response = objectEndpoint.put(bucketName, keyName,
chunkedContent.length(),
chunkedContent.length(), 1, null,
new ByteArrayInputStream(chunkedContent.getBytes()));
//THEN
@ -142,7 +142,7 @@ public void testCopyObject() throws IOException, OS3Exception {
keyName = "sourceKey";
Response response = objectEndpoint.put(bucketName, keyName,
CONTENT.length(), body);
CONTENT.length(), 1, null, body);
String volumeName = clientStub.getObjectStore().getOzoneVolumeName(
bucketName);
@ -161,7 +161,8 @@ public void testCopyObject() throws IOException, OS3Exception {
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
bucketName + "/" + keyName);
response = objectEndpoint.put(destBucket, destkey, CONTENT.length(), body);
response = objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1,
null, body);
// Check destination key and response
volumeName = clientStub.getObjectStore().getOzoneVolumeName(destBucket);
@ -175,7 +176,7 @@ public void testCopyObject() throws IOException, OS3Exception {
// source and dest same
try {
objectEndpoint.put(bucketName, keyName, CONTENT.length(), body);
objectEndpoint.put(bucketName, keyName, CONTENT.length(), 1, null, body);
fail("test copy object failed");
} catch (OS3Exception ex) {
Assert.assertTrue(ex.getErrorMessage().contains("This copy request is " +
@ -186,7 +187,7 @@ public void testCopyObject() throws IOException, OS3Exception {
try {
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
nonexist + "/" + keyName);
objectEndpoint.put(destBucket, destkey, CONTENT.length(),
objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1, null,
body);
fail("test copy object failed");
} catch (OS3Exception ex) {
@ -197,7 +198,7 @@ public void testCopyObject() throws IOException, OS3Exception {
try {
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
bucketName + "/" + keyName);
objectEndpoint.put(nonexist, destkey, CONTENT.length(), body);
objectEndpoint.put(nonexist, destkey, CONTENT.length(), 1, null, body);
fail("test copy object failed");
} catch (OS3Exception ex) {
Assert.assertTrue(ex.getCode().contains("NoSuchBucket"));
@ -207,7 +208,7 @@ public void testCopyObject() throws IOException, OS3Exception {
try {
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
nonexist + "/" + keyName);
objectEndpoint.put(nonexist, destkey, CONTENT.length(), body);
objectEndpoint.put(nonexist, destkey, CONTENT.length(), 1, null, body);
fail("test copy object failed");
} catch (OS3Exception ex) {
Assert.assertTrue(ex.getCode().contains("NoSuchBucket"));
@ -217,7 +218,8 @@ public void testCopyObject() throws IOException, OS3Exception {
try {
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
bucketName + "/" + nonexist);
objectEndpoint.put("nonexistent", keyName, CONTENT.length(), body);
objectEndpoint.put("nonexistent", keyName, CONTENT.length(), 1,
null, body);
fail("test copy object failed");
} catch (OS3Exception ex) {
Assert.assertTrue(ex.getCode().contains("NoSuchBucket"));
@ -235,7 +237,7 @@ public void testInvalidStorageType() throws IOException {
try {
Response response = objectEndpoint.put(bucketName, keyName,
CONTENT.length(), body);
CONTENT.length(), 1, null, body);
fail("testInvalidStorageType");
} catch (OS3Exception ex) {
assertEquals(S3ErrorTable.INVALID_ARGUMENT.getErrorMessage(),
@ -252,8 +254,8 @@ public void testEmptyStorageType() throws IOException, OS3Exception {
keyName = "sourceKey";
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("");
Response response =
objectEndpoint.put(bucketName, keyName, CONTENT.length(), body);
Response response = objectEndpoint.put(bucketName, keyName, CONTENT
.length(), 1, null, body);
String volumeName = clientStub.getObjectStore()
.getOzoneVolumeName(bucketName);

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.hadoop.ozone.s3.endpoint;
import org.apache.hadoop.ozone.client.OzoneClientStub;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
/**
* This class tests Upload part request.
*/
public class TestPartUpload {
private final static ObjectEndpoint REST = new ObjectEndpoint();;
private final static String BUCKET = "s3bucket";
private final static String KEY = "key1";
@BeforeClass
public static void setUp() throws Exception {
OzoneClientStub client = new OzoneClientStub();
client.getObjectStore().createS3Bucket("ozone", BUCKET);
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
"STANDARD");
REST.setHeaders(headers);
REST.setClient(client);
}
@Test
public void testPartUpload() throws Exception {
Response response = REST.initiateMultipartUpload(BUCKET, KEY, "");
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
(MultipartUploadInitiateResponse) response.getEntity();
assertNotNull(multipartUploadInitiateResponse.getUploadID());
String uploadID = multipartUploadInitiateResponse.getUploadID();
assertEquals(response.getStatus(), 200);
String content = "Multipart Upload";
ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes());
response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body);
assertNotNull(response.getHeaderString("ETag"));
}
@Test
public void testPartUploadWithOverride() throws Exception {
Response response = REST.initiateMultipartUpload(BUCKET, KEY, "");
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
(MultipartUploadInitiateResponse) response.getEntity();
assertNotNull(multipartUploadInitiateResponse.getUploadID());
String uploadID = multipartUploadInitiateResponse.getUploadID();
assertEquals(response.getStatus(), 200);
String content = "Multipart Upload";
ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes());
response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body);
assertNotNull(response.getHeaderString("ETag"));
String eTag = response.getHeaderString("ETag");
// Upload part again with same part Number, the ETag should be changed.
content = "Multipart Upload Changed";
response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body);
assertNotNull(response.getHeaderString("ETag"));
assertNotEquals(eTag, response.getHeaderString("ETag"));
}
@Test
public void testPartUploadWithIncorrectUploadID() throws Exception {
try {
String content = "Multipart Upload With Incorrect uploadID";
ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes());
REST.put(BUCKET, KEY, content.length(), 1, "random", body);
fail("testPartUploadWithIncorrectUploadID failed");
} catch (OS3Exception ex) {
assertEquals("NoSuchUpload", ex.getCode());
assertEquals(HTTP_NOT_FOUND, ex.getHttpCode());
}
}
}