From d66925a315644f3c3bcde589f365fc01f0033d32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 7 Jan 2019 12:39:46 +0100 Subject: [PATCH] HDDS-902. MultipartUpload: S3 API for uploading a part file. Contributed by Bharat Viswanadham. --- .../main/smoketest/s3/MultipartUpload.robot | 37 ++++- .../ozone/s3/endpoint/ObjectEndpoint.java | 39 ++++++ .../ozone/s3/exception/S3ErrorTable.java | 5 + .../hadoop/ozone/client/OzoneBucketStub.java | 51 +++++++ .../ozone/client/OzoneOutputStreamStub.java | 73 ++++++++++ .../ozone/s3/endpoint/TestObjectPut.java | 26 ++-- .../ozone/s3/endpoint/TestPartUpload.java | 126 ++++++++++++++++++ 7 files changed, 341 insertions(+), 16 deletions(-) create mode 100644 hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java create mode 100644 hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot index f5eee56770..251d0277cf 100644 --- a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot +++ b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot @@ -21,22 +21,51 @@ Resource ../commonlib.robot Resource commonawslib.robot Test Setup Setup s3 tests +*** Keywords *** +Create Random file for mac + Execute dd if=/dev/urandom of=/tmp/part1 bs=1m count=5 + +Create Random file for linux + Execute dd if=/dev/urandom of=/tmp/part1 bs=1M count=5 + + *** Variables *** ${ENDPOINT_URL} http://s3g:9878 ${BUCKET} generated *** Test Cases *** -Initiate Multipart Upload - ${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey +Test Multipart Upload + ${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey --storage-class REDUCED_REDUNDANCY ${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0 Should contain ${result} ${BUCKET} Should contain ${result} multipartKey Should contain ${result} UploadId # initiate again - ${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey + ${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey --storage-class REDUCED_REDUNDANCY ${nextUploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0 Should contain ${result} ${BUCKET} Should contain ${result} multipartKey Should contain ${result} UploadId - Should Not Be Equal ${uploadID} ${nextUploadID} \ No newline at end of file + Should Not Be Equal ${uploadID} ${nextUploadID} + +# upload part +# each part should be minimum 5mb, other wise during complete multipart +# upload we get error entity too small. So, considering further complete +# multipart upload, uploading each part as 5MB file, exception is for last part + + ${system} = Evaluate platform.system() platform + Run Keyword if '${system}' == 'Darwin' Create Random file for mac + Run Keyword if '${system}' == 'Linux' Create Random file for linux + ${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID} + Should contain ${result} ETag +# override part + Run Keyword if '${system}' == 'Darwin' Create Random file for mac + Run Keyword if '${system}' == 'Linux' Create Random file for linux + ${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/part1 --upload-id ${nextUploadID} + Should contain ${result} ETag + +Upload part with Incorrect uploadID + Execute echo "Multipart upload" > /tmp/testfile + ${result} = Execute AWSS3APICli and checkrc upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/testfile --upload-id "random" 255 + Should contain ${result} NoSuchUpload \ No newline at end of file diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index ec25fdaca2..5e3807b0ad 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.s3.endpoint; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.HEAD; import javax.ws.rs.HeaderParam; @@ -48,6 +49,7 @@ import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.s3.SignedChunksInputStream; import org.apache.hadoop.ozone.s3.exception.OS3Exception; @@ -64,6 +66,8 @@ import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED; import org.apache.commons.io.IOUtils; + +import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD; import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER; @@ -108,9 +112,18 @@ public Response put( @PathParam("bucket") String bucketName, @PathParam("path") String keyPath, @HeaderParam("Content-Length") long length, + @QueryParam("partNumber") int partNumber, + @QueryParam("uploadId") @DefaultValue("") String uploadID, InputStream body) throws IOException, OS3Exception { OzoneOutputStream output = null; + + if (!uploadID.equals("")) { + // If uploadID is specified, it is a request for upload part + return createMultipartKey(bucketName, keyPath, length, + partNumber, uploadID, body); + } + try { String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER); String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER); @@ -390,6 +403,32 @@ public Response initiateMultipartUpload( } + private Response createMultipartKey(String bucket, String key, long length, + int partNumber, String uploadID, + InputStream body) + throws IOException, OS3Exception { + try { + OzoneBucket ozoneBucket = getBucket(bucket); + OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey( + key, length, partNumber, uploadID); + IOUtils.copy(body, ozoneOutputStream); + ozoneOutputStream.close(); + OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = + ozoneOutputStream.getCommitUploadPartInfo(); + return Response.status(Status.OK).header("ETag", + omMultipartCommitUploadPartInfo.getPartName()).build(); + + } catch (IOException ex) { + if (ex.getMessage().contains("NO_SUCH_MULTIPART_UPLOAD_ERROR")) { + OS3Exception os3Exception = S3ErrorTable.newError(NO_SUCH_UPLOAD, + uploadID); + throw os3Exception; + } + throw ex; + } + + } + @VisibleForTesting public void setHeaders(HttpHeaders headers) { this.headers = headers; diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java index 4ca36e4d80..0582c45f81 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java @@ -63,6 +63,11 @@ private S3ErrorTable() { "InvalidRange", "The requested range is not satisfiable", RANGE_NOT_SATISFIABLE); + public static final OS3Exception NO_SUCH_UPLOAD = new OS3Exception( + "NoSuchUpload", "The specified multipart upload does not exist. The " + + "upload ID might be invalid, or the multipart upload might have " + + "been aborted or completed.", HTTP_NOT_FOUND); + /** * Create a new instance of Error. * @param e Error Template diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 1aa4e5329d..cf8be6905a 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -49,6 +49,9 @@ public class OzoneBucketStub extends OzoneBucket { private Map keyContents = new HashMap<>(); private Map multipartUploadIdMap = new HashMap<>(); + + private Map> partList = new HashMap<>(); + /** * Constructs OzoneBucket instance. * @@ -160,4 +163,52 @@ public OmMultipartInfo initiateMultipartUpload(String keyName, multipartUploadIdMap.put(keyName, uploadID); return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID); } + + @Override + public OzoneOutputStream createMultipartKey(String key, long size, + int partNumber, String uploadID) + throws IOException { + String multipartUploadID = multipartUploadIdMap.get(key); + if (multipartUploadID == null || multipartUploadID != uploadID) { + throw new IOException("NO_SUCH_MULTIPART_UPLOAD_ERROR"); + } else { + ByteArrayOutputStream byteArrayOutputStream = + new ByteArrayOutputStream((int) size) { + @Override + public void close() throws IOException { + Part part = new Part(key + size, + toByteArray()); + if (partList.get(key) == null) { + Map parts = new HashMap<>(); + parts.put(partNumber, part); + partList.put(key, parts); + } else { + partList.get(key).put(partNumber, part); + } + } + }; + return new OzoneOutputStreamStub(byteArrayOutputStream, key + size); + } + } + + /** + * Class used to hold part information in a upload part request. + */ + public class Part { + private String partName; + private byte[] content; + + public Part(String name, byte[] data) { + this.partName = name; + this.content = data; + } + + public String getPartName() { + return partName; + } + + public byte[] getContent() { + return content; + } + } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java new file mode 100644 index 0000000000..28e377b428 --- /dev/null +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.hadoop.ozone.client; + +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * OzoneOutputStream stub for testing. + */ +public class OzoneOutputStreamStub extends OzoneOutputStream { + + private final String partName; + + /** + * Constructs OzoneOutputStreamStub with outputStream and partName. + * + * @param outputStream + * @param name - partName + */ + public OzoneOutputStreamStub(OutputStream outputStream, String name) { + super(outputStream); + this.partName = name; + } + + @Override + public void write(int b) throws IOException { + getOutputStream().write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + getOutputStream().write(b, off, len); + } + + @Override + public synchronized void flush() throws IOException { + getOutputStream().flush(); + } + + @Override + public synchronized void close() throws IOException { + //commitKey can be done here, if needed. + getOutputStream().close(); + } + + @Override + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + return new OmMultipartCommitUploadPartInfo(partName); + } + +} diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index d60ddd28be..839834cbdf 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -85,7 +85,7 @@ public void testPutObject() throws IOException, OS3Exception { //WHEN Response response = objectEndpoint.put(bucketName, keyName, CONTENT - .length(), body); + .length(), 1, null, body); //THEN @@ -117,7 +117,7 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception { //WHEN Response response = objectEndpoint.put(bucketName, keyName, - chunkedContent.length(), + chunkedContent.length(), 1, null, new ByteArrayInputStream(chunkedContent.getBytes())); //THEN @@ -142,7 +142,7 @@ public void testCopyObject() throws IOException, OS3Exception { keyName = "sourceKey"; Response response = objectEndpoint.put(bucketName, keyName, - CONTENT.length(), body); + CONTENT.length(), 1, null, body); String volumeName = clientStub.getObjectStore().getOzoneVolumeName( bucketName); @@ -161,7 +161,8 @@ public void testCopyObject() throws IOException, OS3Exception { when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn( bucketName + "/" + keyName); - response = objectEndpoint.put(destBucket, destkey, CONTENT.length(), body); + response = objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1, + null, body); // Check destination key and response volumeName = clientStub.getObjectStore().getOzoneVolumeName(destBucket); @@ -175,7 +176,7 @@ public void testCopyObject() throws IOException, OS3Exception { // source and dest same try { - objectEndpoint.put(bucketName, keyName, CONTENT.length(), body); + objectEndpoint.put(bucketName, keyName, CONTENT.length(), 1, null, body); fail("test copy object failed"); } catch (OS3Exception ex) { Assert.assertTrue(ex.getErrorMessage().contains("This copy request is " + @@ -186,7 +187,7 @@ public void testCopyObject() throws IOException, OS3Exception { try { when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn( nonexist + "/" + keyName); - objectEndpoint.put(destBucket, destkey, CONTENT.length(), + objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1, null, body); fail("test copy object failed"); } catch (OS3Exception ex) { @@ -197,7 +198,7 @@ public void testCopyObject() throws IOException, OS3Exception { try { when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn( bucketName + "/" + keyName); - objectEndpoint.put(nonexist, destkey, CONTENT.length(), body); + objectEndpoint.put(nonexist, destkey, CONTENT.length(), 1, null, body); fail("test copy object failed"); } catch (OS3Exception ex) { Assert.assertTrue(ex.getCode().contains("NoSuchBucket")); @@ -207,7 +208,7 @@ public void testCopyObject() throws IOException, OS3Exception { try { when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn( nonexist + "/" + keyName); - objectEndpoint.put(nonexist, destkey, CONTENT.length(), body); + objectEndpoint.put(nonexist, destkey, CONTENT.length(), 1, null, body); fail("test copy object failed"); } catch (OS3Exception ex) { Assert.assertTrue(ex.getCode().contains("NoSuchBucket")); @@ -217,7 +218,8 @@ public void testCopyObject() throws IOException, OS3Exception { try { when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn( bucketName + "/" + nonexist); - objectEndpoint.put("nonexistent", keyName, CONTENT.length(), body); + objectEndpoint.put("nonexistent", keyName, CONTENT.length(), 1, + null, body); fail("test copy object failed"); } catch (OS3Exception ex) { Assert.assertTrue(ex.getCode().contains("NoSuchBucket")); @@ -235,7 +237,7 @@ public void testInvalidStorageType() throws IOException { try { Response response = objectEndpoint.put(bucketName, keyName, - CONTENT.length(), body); + CONTENT.length(), 1, null, body); fail("testInvalidStorageType"); } catch (OS3Exception ex) { assertEquals(S3ErrorTable.INVALID_ARGUMENT.getErrorMessage(), @@ -252,8 +254,8 @@ public void testEmptyStorageType() throws IOException, OS3Exception { keyName = "sourceKey"; when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(""); - Response response = - objectEndpoint.put(bucketName, keyName, CONTENT.length(), body); + Response response = objectEndpoint.put(bucketName, keyName, CONTENT + .length(), 1, null, body); String volumeName = clientStub.getObjectStore() .getOzoneVolumeName(bucketName); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java new file mode 100644 index 0000000000..55b07c8803 --- /dev/null +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.hadoop.ozone.s3.endpoint; + +import org.apache.hadoop.ozone.client.OzoneClientStub; +import org.apache.hadoop.ozone.s3.exception.OS3Exception; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; + +import java.io.ByteArrayInputStream; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +/** + * This class tests Upload part request. + */ +public class TestPartUpload { + + private final static ObjectEndpoint REST = new ObjectEndpoint();; + private final static String BUCKET = "s3bucket"; + private final static String KEY = "key1"; + + @BeforeClass + public static void setUp() throws Exception { + + OzoneClientStub client = new OzoneClientStub(); + client.getObjectStore().createS3Bucket("ozone", BUCKET); + + + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn( + "STANDARD"); + + REST.setHeaders(headers); + REST.setClient(client); + } + + + @Test + public void testPartUpload() throws Exception { + + Response response = REST.initiateMultipartUpload(BUCKET, KEY, ""); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + + assertEquals(response.getStatus(), 200); + + String content = "Multipart Upload"; + ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes()); + response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body); + + assertNotNull(response.getHeaderString("ETag")); + + } + + @Test + public void testPartUploadWithOverride() throws Exception { + + Response response = REST.initiateMultipartUpload(BUCKET, KEY, ""); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + + assertEquals(response.getStatus(), 200); + + String content = "Multipart Upload"; + ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes()); + response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body); + + assertNotNull(response.getHeaderString("ETag")); + + String eTag = response.getHeaderString("ETag"); + + // Upload part again with same part Number, the ETag should be changed. + content = "Multipart Upload Changed"; + response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body); + assertNotNull(response.getHeaderString("ETag")); + assertNotEquals(eTag, response.getHeaderString("ETag")); + + } + + + @Test + public void testPartUploadWithIncorrectUploadID() throws Exception { + try { + String content = "Multipart Upload With Incorrect uploadID"; + ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes()); + REST.put(BUCKET, KEY, content.length(), 1, "random", body); + fail("testPartUploadWithIncorrectUploadID failed"); + } catch (OS3Exception ex) { + assertEquals("NoSuchUpload", ex.getCode()); + assertEquals(HTTP_NOT_FOUND, ex.getHttpCode()); + } + } +}