From 2ec97abb2e93c1a8127e7a146c08e26454b583fa Mon Sep 17 00:00:00 2001 From: Ewan Higgs Date: Wed, 8 Aug 2018 13:50:23 +0200 Subject: [PATCH] HADOOP-15576. S3A Multipart Uploader to work with S3Guard and encryption Originally contributed by Ewan Higgs with refinements by Steve Loughran. --- .../fs/FileSystemMultipartUploader.java | 69 ++-- .../apache/hadoop/fs/MultipartUploader.java | 32 +- .../java/org/apache/hadoop/fs/PartHandle.java | 8 +- .../java/org/apache/hadoop/fs/PathHandle.java | 9 +- .../AbstractSystemMultipartUploaderTest.java | 143 --------- ...AbstractContractMultipartUploaderTest.java | 300 ++++++++++++++++++ ...TestLocalFSContractMultipartUploader.java} | 50 +-- .../hadoop/fs/TestHDFSMultipartUploader.java | 76 ----- .../TestHDFSContractMultipartUploader.java | 58 ++++ .../hadoop/fs/s3a/S3AMultipartUploader.java | 199 ++++++++---- .../hadoop/fs/s3a/WriteOperationHelper.java | 4 + ....apache.hadoop.fs.MultipartUploaderFactory | 0 .../ITestS3AContractMultipartUploader.java | 116 +++++++ .../hadoop/fs/s3a/S3ATestConstants.java | 5 + .../s3a/TestS3AMultipartUploaderSupport.java | 84 +++++ .../TestStagingPartitionedJobCommit.java | 4 +- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 4 +- .../src/test/resources/contract/s3a.xml | 5 + 18 files changed, 798 insertions(+), 368 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java rename hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/{TestLocalFileSystemMultipartUploader.java => contract/localfs/TestLocalFSContractMultipartUploader.java} (53%) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java rename hadoop-tools/hadoop-aws/src/main/resources/META-INF/{ => services}/org.apache.hadoop.fs.MultipartUploaderFactory (100%) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java index b57ff3dc3a..a700a9fd0b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java @@ -16,12 +16,6 @@ */ package org.apache.hadoop.fs; -import com.google.common.base.Charsets; -import org.apache.commons.compress.utils.IOUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; - import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -29,13 +23,26 @@ import java.util.List; import java.util.stream.Collectors; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; + +import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; + +import static org.apache.hadoop.fs.Path.mergePaths; + /** * A MultipartUploader that uses the basic FileSystem commands. * This is done in three stages: - * Init - create a temp _multipart directory. - * PutPart - copying the individual parts of the file to the temp directory. - * Complete - use {@link FileSystem#concat} to merge the files; and then delete - * the temp directory. + * */ public class FileSystemMultipartUploader extends MultipartUploader { @@ -64,28 +71,44 @@ public PartHandle putPart(Path filePath, InputStream inputStream, Path collectorPath = new Path(new String(uploadIdByteArray, 0, uploadIdByteArray.length, Charsets.UTF_8)); Path partPath = - Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR), + mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR), new Path(Integer.toString(partNumber) + ".part"))); - FSDataOutputStreamBuilder outputStream = fs.createFile(partPath); - FSDataOutputStream fsDataOutputStream = outputStream.build(); - IOUtils.copy(inputStream, fsDataOutputStream, 4096); - fsDataOutputStream.close(); + try(FSDataOutputStream fsDataOutputStream = + fs.createFile(partPath).build()) { + IOUtils.copy(inputStream, fsDataOutputStream, 4096); + } finally { + org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream); + } return BBPartHandle.from(ByteBuffer.wrap( partPath.toString().getBytes(Charsets.UTF_8))); } private Path createCollectorPath(Path filePath) { - return Path.mergePaths(filePath.getParent(), - Path.mergePaths(new Path(filePath.getName().split("\\.")[0]), - Path.mergePaths(new Path("_multipart"), + return mergePaths(filePath.getParent(), + mergePaths(new Path(filePath.getName().split("\\.")[0]), + mergePaths(new Path("_multipart"), new Path(Path.SEPARATOR)))); } + private PathHandle getPathHandle(Path filePath) throws IOException { + FileStatus status = fs.getFileStatus(filePath); + return fs.getPathHandle(status); + } + @Override @SuppressWarnings("deprecation") // rename w/ OVERWRITE public PathHandle complete(Path filePath, List> handles, UploadHandle multipartUploadId) throws IOException { + + if (handles.isEmpty()) { + throw new IOException("Empty upload"); + } + // If destination already exists, we believe we already completed it. + if (fs.exists(filePath)) { + return getPathHandle(filePath); + } + handles.sort(Comparator.comparing(Pair::getKey)); List partHandles = handles .stream() @@ -97,22 +120,26 @@ public PathHandle complete(Path filePath, .collect(Collectors.toList()); Path collectorPath = createCollectorPath(filePath); - Path filePathInsideCollector = Path.mergePaths(collectorPath, + Path filePathInsideCollector = mergePaths(collectorPath, new Path(Path.SEPARATOR + filePath.getName())); fs.create(filePathInsideCollector).close(); fs.concat(filePathInsideCollector, partHandles.toArray(new Path[handles.size()])); fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE); fs.delete(collectorPath, true); - FileStatus status = fs.getFileStatus(filePath); - return fs.getPathHandle(status); + return getPathHandle(filePath); } @Override public void abort(Path filePath, UploadHandle uploadId) throws IOException { byte[] uploadIdByteArray = uploadId.toByteArray(); + Preconditions.checkArgument(uploadIdByteArray.length != 0, + "UploadId is empty"); Path collectorPath = new Path(new String(uploadIdByteArray, 0, uploadIdByteArray.length, Charsets.UTF_8)); + + // force a check for a file existing; raises FNFE if not found + fs.getFileStatus(collectorPath); fs.delete(collectorPath, true); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java index 24a92169a2..47fd9f29b9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java @@ -21,17 +21,20 @@ import java.io.InputStream; import java.util.List; -import org.apache.commons.lang3.tuple.Pair; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; + /** * MultipartUploader is an interface for copying files multipart and across * multiple nodes. Users should: - * 1. Initialize an upload - * 2. Upload parts in any order - * 3. Complete the upload in order to have it materialize in the destination FS. + *
    + *
  1. Initialize an upload
  2. + *
  3. Upload parts in any order
  4. + *
  5. Complete the upload in order to have it materialize in the destination + * FS
  6. + *
* * Implementers should make sure that the complete function should make sure * that 'complete' will reorder parts if the destination FS doesn't already @@ -45,7 +48,7 @@ public abstract class MultipartUploader { * Initialize a multipart upload. * @param filePath Target path for upload. * @return unique identifier associating part uploads. - * @throws IOException + * @throws IOException IO failure */ public abstract UploadHandle initialize(Path filePath) throws IOException; @@ -53,12 +56,13 @@ public abstract class MultipartUploader { * Put part as part of a multipart upload. It should be possible to have * parts uploaded in any order (or in parallel). * @param filePath Target path for upload (same as {@link #initialize(Path)}). - * @param inputStream Data for this part. + * @param inputStream Data for this part. Implementations MUST close this + * stream after reading in the data. * @param partNumber Index of the part relative to others. * @param uploadId Identifier from {@link #initialize(Path)}. * @param lengthInBytes Target length to read from the stream. * @return unique PartHandle identifier for the uploaded part. - * @throws IOException + * @throws IOException IO failure */ public abstract PartHandle putPart(Path filePath, InputStream inputStream, int partNumber, UploadHandle uploadId, long lengthInBytes) @@ -67,12 +71,12 @@ public abstract PartHandle putPart(Path filePath, InputStream inputStream, /** * Complete a multipart upload. * @param filePath Target path for upload (same as {@link #initialize(Path)}. - * @param handles Identifiers with associated part numbers from - * {@link #putPart(Path, InputStream, int, UploadHandle, long)}. + * @param handles non-empty list of identifiers with associated part numbers + * from {@link #putPart(Path, InputStream, int, UploadHandle, long)}. * Depending on the backend, the list order may be significant. * @param multipartUploadId Identifier from {@link #initialize(Path)}. * @return unique PathHandle identifier for the uploaded file. - * @throws IOException + * @throws IOException IO failure or the handle list is empty. */ public abstract PathHandle complete(Path filePath, List> handles, UploadHandle multipartUploadId) @@ -81,10 +85,10 @@ public abstract PathHandle complete(Path filePath, /** * Aborts a multipart upload. * @param filePath Target path for upload (same as {@link #initialize(Path)}. - * @param multipartuploadId Identifier from {@link #initialize(Path)}. - * @throws IOException + * @param multipartUploadId Identifier from {@link #initialize(Path)}. + * @throws IOException IO failure */ - public abstract void abort(Path filePath, UploadHandle multipartuploadId) + public abstract void abort(Path filePath, UploadHandle multipartUploadId) throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java index df70b746cc..47ce3ab189 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java @@ -16,14 +16,14 @@ */ package org.apache.hadoop.fs; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - import java.io.Serializable; import java.nio.ByteBuffer; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + /** - * Opaque, serializable reference to an part id for multipart uploads. + * Opaque, serializable reference to a part id for multipart uploads. */ @InterfaceAudience.Public @InterfaceStability.Evolving diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java index 60aa6a53bf..d5304ba549 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java @@ -25,15 +25,16 @@ /** * Opaque, serializable reference to an entity in the FileSystem. May contain - * metadata sufficient to resolve or verify subsequent accesses indepedent of + * metadata sufficient to resolve or verify subsequent accesses independent of * other modifications to the FileSystem. */ @InterfaceAudience.Public @InterfaceStability.Evolving +@FunctionalInterface public interface PathHandle extends Serializable { /** - * @return Serialized from in bytes. + * @return Serialized form in bytes. */ default byte[] toByteArray() { ByteBuffer bb = bytes(); @@ -42,6 +43,10 @@ default byte[] toByteArray() { return ret; } + /** + * Get the bytes of this path handle. + * @return the bytes to get to the process completing the upload. + */ ByteBuffer bytes(); @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java deleted file mode 100644 index f132089a9e..0000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; - -import org.junit.Test; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public abstract class AbstractSystemMultipartUploaderTest { - - abstract FileSystem getFS() throws IOException; - - abstract Path getBaseTestPath(); - - @Test - public void testMultipartUpload() throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= 100; ++i) { - String contents = "ThisIsPart" + i + "\n"; - sb.append(contents); - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - partHandles.add(Pair.of(i, partHandle)); - } - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); - byte[] fdData = IOUtils.toByteArray(fs.open(fd)); - byte[] fileData = IOUtils.toByteArray(fs.open(file)); - String readString = new String(fdData); - assertEquals(sb.toString(), readString); - assertArrayEquals(fdData, fileData); - } - - @Test - public void testMultipartUploadReverseOrder() throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= 100; ++i) { - String contents = "ThisIsPart" + i + "\n"; - sb.append(contents); - } - for (int i = 100; i > 0; --i) { - String contents = "ThisIsPart" + i + "\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - partHandles.add(Pair.of(i, partHandle)); - } - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); - byte[] fdData = IOUtils.toByteArray(fs.open(fd)); - byte[] fileData = IOUtils.toByteArray(fs.open(file)); - String readString = new String(fdData); - assertEquals(sb.toString(), readString); - assertArrayEquals(fdData, fileData); - } - - @Test - public void testMultipartUploadReverseOrderNoNContiguousPartNumbers() - throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (int i = 2; i <= 200; i += 2) { - String contents = "ThisIsPart" + i + "\n"; - sb.append(contents); - } - for (int i = 200; i > 0; i -= 2) { - String contents = "ThisIsPart" + i + "\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - partHandles.add(Pair.of(i, partHandle)); - } - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); - byte[] fdData = IOUtils.toByteArray(fs.open(fd)); - byte[] fileData = IOUtils.toByteArray(fs.open(file)); - String readString = new String(fdData); - assertEquals(sb.toString(), readString); - assertArrayEquals(fdData, fileData); - } - - @Test - public void testMultipartUploadAbort() throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - for (int i = 100; i >= 50; --i) { - String contents = "ThisIsPart" + i + "\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - } - mpu.abort(file, uploadHandle); - - String contents = "ThisIsPart49\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - - try { - mpu.putPart(file, is, 49, uploadHandle, len); - fail("putPart should have thrown an exception"); - } catch (IOException ok) { - // ignore - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java new file mode 100644 index 0000000000..c0e1600d52 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Charsets; +import org.junit.Test; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.BBUploadHandle; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.MultipartUploader; +import org.apache.hadoop.fs.MultipartUploaderFactory; +import org.apache.hadoop.fs.PartHandle; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathHandle; +import org.apache.hadoop.fs.UploadHandle; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +public abstract class AbstractContractMultipartUploaderTest extends + AbstractFSContractTestBase { + + /** + * The payload is the part number repeated for the length of the part. + * This makes checking the correctness of the upload straightforward. + * @param partNumber part number + * @return the bytes to upload. + */ + private byte[] generatePayload(int partNumber) { + int sizeInBytes = partSizeInBytes(); + ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes); + for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) { + buffer.putInt(partNumber); + } + return buffer.array(); + } + + /** + * Load a path, make an MD5 digest. + * @param path path to load + * @return the digest array + * @throws IOException failure to read or digest the file. + */ + protected byte[] digest(Path path) throws IOException { + FileSystem fs = getFileSystem(); + try (InputStream in = fs.open(path)) { + byte[] fdData = IOUtils.toByteArray(in); + MessageDigest newDigest = DigestUtils.getMd5Digest(); + return newDigest.digest(fdData); + } + } + + /** + * Get the partition size in bytes to use for each upload. + * @return a number > 0 + */ + protected abstract int partSizeInBytes(); + + /** + * Get the number of test payloads to upload. + * @return a number > 1 + */ + protected int getTestPayloadCount() { + return 10; + } + + /** + * Assert that a multipart upload is successful. + * @throws Exception failure + */ + @Test + public void testSingleUpload() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testSingleUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + byte[] payload = generatePayload(1); + origDigest.update(payload); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle, + payload.length); + partHandles.add(Pair.of(1, partHandle)); + PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles, + origDigest, + payload.length); + + // Complete is idempotent + PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle); + assertArrayEquals("Path handles differ", fd.toByteArray(), + fd2.toByteArray()); + } + + private PathHandle completeUpload(final Path file, + final MultipartUploader mpu, + final UploadHandle uploadHandle, + final List> partHandles, + final MessageDigest origDigest, + final int expectedLength) throws IOException { + PathHandle fd = mpu.complete(file, partHandles, uploadHandle); + + FileStatus status = verifyPathExists(getFileSystem(), + "Completed file", file); + assertEquals("length of " + status, + expectedLength, status.getLen()); + + assertArrayEquals("digest of source and " + file + + " differ", + origDigest.digest(), digest(file)); + return fd; + } + + /** + * Assert that a multipart upload is successful. + * @throws Exception failure + */ + @Test + public void testMultipartUpload() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + final int payloadCount = getTestPayloadCount(); + for (int i = 1; i <= payloadCount; ++i) { + byte[] payload = generatePayload(i); + origDigest.update(payload); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + payloadCount * partSizeInBytes()); + } + + /** + * Assert that a multipart upload is successful even when the parts are + * given in the reverse order. + */ + @Test + public void testMultipartUploadReverseOrder() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUploadReverseOrder"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + final int payloadCount = getTestPayloadCount(); + for (int i = 1; i <= payloadCount; ++i) { + byte[] payload = generatePayload(i); + origDigest.update(payload); + } + for (int i = payloadCount; i > 0; --i) { + byte[] payload = generatePayload(i); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + payloadCount * partSizeInBytes()); + } + + /** + * Assert that a multipart upload is successful even when the parts are + * given in reverse order and the part numbers are not contiguous. + */ + @Test + public void testMultipartUploadReverseOrderNonContiguousPartNumbers() + throws Exception { + describe("Upload in reverse order and the part numbers are not contiguous"); + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + int payloadCount = 2 * getTestPayloadCount(); + for (int i = 2; i <= payloadCount; i += 2) { + byte[] payload = generatePayload(i); + origDigest.update(payload); + } + for (int i = payloadCount; i > 0; i -= 2) { + byte[] payload = generatePayload(i); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + getTestPayloadCount() * partSizeInBytes()); + } + + /** + * Assert that when we abort a multipart upload, the resulting file does + * not show up. + */ + @Test + public void testMultipartUploadAbort() throws Exception { + describe("Upload and then abort it before completing"); + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUploadAbort"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + for (int i = 20; i >= 10; --i) { + byte[] payload = generatePayload(i); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + mpu.abort(file, uploadHandle); + + String contents = "ThisIsPart49\n"; + int len = contents.getBytes(Charsets.UTF_8).length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + + intercept(IOException.class, + () -> mpu.putPart(file, is, 49, uploadHandle, len)); + intercept(IOException.class, + () -> mpu.complete(file, partHandles, uploadHandle)); + + assertPathDoesNotExist("Uploaded file should not exist", file); + } + + /** + * Trying to abort from an invalid handle must fail. + */ + @Test + public void testAbortUnknownUpload() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testAbortUnknownUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + ByteBuffer byteBuffer = ByteBuffer.wrap( + "invalid-handle".getBytes(Charsets.UTF_8)); + UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); + intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle)); + } + + /** + * Trying to abort with a handle of size 0 must fail. + */ + @Test + public void testAbortEmptyUploadHandle() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testAbortEmptyUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]); + UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); + intercept(IllegalArgumentException.class, + () -> mpu.abort(file, uploadHandle)); + } + + /** + * When we complete with no parts provided, it must fail. + */ + @Test + public void testCompleteEmptyUpload() throws Exception { + describe("Expect an empty MPU to fail, but still be abortable"); + FileSystem fs = getFileSystem(); + Path dest = path("testCompleteEmptyUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle handle = mpu.initialize(dest); + intercept(IOException.class, + () -> mpu.complete(dest, new ArrayList<>(), handle)); + mpu.abort(dest, handle); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java similarity index 53% rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java index 21d01b6cdb..a50d2e41b1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java @@ -15,51 +15,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.fs; +package org.apache.hadoop.fs.contract.localfs; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir; - -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.File; -import java.io.IOException; +import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Test the FileSystemMultipartUploader on local file system. */ -public class TestLocalFileSystemMultipartUploader - extends AbstractSystemMultipartUploaderTest { - - private static FileSystem fs; - private File tmp; - - @BeforeClass - public static void init() throws IOException { - fs = LocalFileSystem.getLocal(new Configuration()); - } - - @Before - public void setup() throws IOException { - tmp = getRandomizedTestDir(); - tmp.mkdirs(); - } - - @After - public void tearDown() throws IOException { - tmp.delete(); - } +public class TestLocalFSContractMultipartUploader + extends AbstractContractMultipartUploaderTest { @Override - public FileSystem getFS() { - return fs; + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); } + /** + * There is no real need to upload any particular size. + * @return 1 kilobyte + */ @Override - public Path getBaseTestPath() { - return new Path(tmp.getAbsolutePath()); + protected int partSizeInBytes() { + return 1024; } - -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java deleted file mode 100644 index 96c50938b3..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TestName; - -import java.io.IOException; - -public class TestHDFSMultipartUploader - extends AbstractSystemMultipartUploaderTest { - - private static MiniDFSCluster cluster; - private Path tmp; - - @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void init() throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(conf, - GenericTestUtils.getRandomizedTestDir()) - .numDataNodes(1) - .build(); - cluster.waitClusterUp(); - } - - @AfterClass - public static void cleanup() throws IOException { - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - @Before - public void setup() throws IOException { - tmp = new Path(cluster.getFileSystem().getWorkingDirectory(), - name.getMethodName()); - cluster.getFileSystem().mkdirs(tmp); - } - - @Override - public FileSystem getFS() throws IOException { - return cluster.getFileSystem(); - } - - @Override - public Path getBaseTestPath() { - return tmp; - } - -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java new file mode 100644 index 0000000000..f3a5265de7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.hdfs; + +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test MultipartUploader tests on HDFS. + */ +public class TestHDFSContractMultipartUploader extends + AbstractContractMultipartUploaderTest { + + @BeforeClass + public static void createCluster() throws IOException { + HDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + HDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new HDFSContract(conf); + } + + /** + * HDFS doesn't have any restriction on the part size. + * @return 1KB + */ + @Override + protected int partSizeInBytes() { + return 1024; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java index 34c88d43f6..6a1df54bd6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java @@ -17,15 +17,26 @@ */ package org.apache.hadoop.fs.s3a; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BBPartHandle; @@ -37,13 +48,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathHandle; import org.apache.hadoop.fs.UploadHandle; -import org.apache.hadoop.hdfs.DFSUtilClient; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Collectors; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; /** * MultipartUploader for S3AFileSystem. This uses the S3 multipart @@ -53,6 +59,10 @@ public class S3AMultipartUploader extends MultipartUploader { private final S3AFileSystem s3a; + /** Header for Parts: {@value}. */ + + public static final String HEADER = "S3A-part01"; + public S3AMultipartUploader(FileSystem fs, Configuration conf) { if (!(fs instanceof S3AFileSystem)) { throw new IllegalArgumentException( @@ -63,75 +73,72 @@ public S3AMultipartUploader(FileSystem fs, Configuration conf) { @Override public UploadHandle initialize(Path filePath) throws IOException { + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); String key = s3a.pathToKey(filePath); - InitiateMultipartUploadRequest request = - new InitiateMultipartUploadRequest(s3a.getBucket(), key); - LOG.debug("initialize request: {}", request); - InitiateMultipartUploadResult result = s3a.initiateMultipartUpload(request); - String uploadId = result.getUploadId(); + String uploadId = writeHelper.initiateMultiPartUpload(key); return BBUploadHandle.from(ByteBuffer.wrap( uploadId.getBytes(Charsets.UTF_8))); } @Override public PartHandle putPart(Path filePath, InputStream inputStream, - int partNumber, UploadHandle uploadId, long lengthInBytes) { - String key = s3a.pathToKey(filePath); - UploadPartRequest request = new UploadPartRequest(); - byte[] uploadIdBytes = uploadId.toByteArray(); - request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length, - Charsets.UTF_8)); - request.setInputStream(inputStream); - request.setPartSize(lengthInBytes); - request.setPartNumber(partNumber); - request.setBucketName(s3a.getBucket()); - request.setKey(key); - LOG.debug("putPart request: {}", request); - UploadPartResult result = s3a.uploadPart(request); - String eTag = result.getETag(); - return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8))); - } - - @Override - public PathHandle complete(Path filePath, - List> handles, UploadHandle uploadId) { - String key = s3a.pathToKey(filePath); - CompleteMultipartUploadRequest request = - new CompleteMultipartUploadRequest(); - request.setBucketName(s3a.getBucket()); - request.setKey(key); - byte[] uploadIdBytes = uploadId.toByteArray(); - request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length, - Charsets.UTF_8)); - List eTags = handles - .stream() - .map(handle -> { - byte[] partEtagBytes = handle.getRight().toByteArray(); - return new PartETag(handle.getLeft(), - new String(partEtagBytes, 0, partEtagBytes.length, - Charsets.UTF_8)); - }) - .collect(Collectors.toList()); - request.setPartETags(eTags); - LOG.debug("Complete request: {}", request); - CompleteMultipartUploadResult completeMultipartUploadResult = - s3a.getAmazonS3Client().completeMultipartUpload(request); - - byte[] eTag = DFSUtilClient.string2Bytes( - completeMultipartUploadResult.getETag()); - return (PathHandle) () -> ByteBuffer.wrap(eTag); - } - - @Override - public void abort(Path filePath, UploadHandle uploadId) { + int partNumber, UploadHandle uploadId, long lengthInBytes) + throws IOException { + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); String key = s3a.pathToKey(filePath); byte[] uploadIdBytes = uploadId.toByteArray(); String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length, Charsets.UTF_8); - AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a - .getBucket(), key, uploadIdString); - LOG.debug("Abort request: {}", request); - s3a.getAmazonS3Client().abortMultipartUpload(request); + UploadPartRequest request = writeHelper.newUploadPartRequest(key, + uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L); + UploadPartResult result = writeHelper.uploadPart(request); + String eTag = result.getETag(); + return BBPartHandle.from( + ByteBuffer.wrap( + buildPartHandlePayload(eTag, lengthInBytes))); + } + + @Override + public PathHandle complete(Path filePath, + List> handles, UploadHandle uploadId) + throws IOException { + byte[] uploadIdBytes = uploadId.toByteArray(); + checkUploadId(uploadIdBytes); + if (handles.isEmpty()) { + throw new IOException("Empty upload"); + } + + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); + String key = s3a.pathToKey(filePath); + + String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8); + ArrayList eTags = new ArrayList<>(); + eTags.ensureCapacity(handles.size()); + long totalLength = 0; + for (Pair handle : handles) { + byte[] payload = handle.getRight().toByteArray(); + Pair result = parsePartHandlePayload(payload); + totalLength += result.getLeft(); + eTags.add(new PartETag(handle.getLeft(), result.getRight())); + } + AtomicInteger errorCount = new AtomicInteger(0); + CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries( + key, uploadIdStr, eTags, totalLength, errorCount); + + byte[] eTag = result.getETag().getBytes(Charsets.UTF_8); + return (PathHandle) () -> ByteBuffer.wrap(eTag); + } + + @Override + public void abort(Path filePath, UploadHandle uploadId) throws IOException { + final byte[] uploadIdBytes = uploadId.toByteArray(); + checkUploadId(uploadIdBytes); + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); + String key = s3a.pathToKey(filePath); + String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8); + writeHelper.abortMultipartCommit(key, uploadIdString); } /** @@ -141,10 +148,64 @@ public static class Factory extends MultipartUploaderFactory { @Override protected MultipartUploader createMultipartUploader(FileSystem fs, Configuration conf) { - if (fs.getScheme().equals("s3a")) { + if (FS_S3A.equals(fs.getScheme())) { return new S3AMultipartUploader(fs, conf); } return null; } } + + private void checkUploadId(byte[] uploadId) throws IllegalArgumentException { + Preconditions.checkArgument(uploadId.length > 0, + "Empty UploadId is not valid"); + } + + /** + * Build the payload for marshalling. + * @param eTag upload etag + * @param len length + * @return a byte array to marshall. + * @throws IOException error writing the payload + */ + @VisibleForTesting + static byte[] buildPartHandlePayload(String eTag, long len) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(eTag), + "Empty etag"); + Preconditions.checkArgument(len > 0, + "Invalid length"); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try(DataOutputStream output = new DataOutputStream(bytes)) { + output.writeUTF(HEADER); + output.writeLong(len); + output.writeUTF(eTag); + } + return bytes.toByteArray(); + } + + /** + * Parse the payload marshalled as a part handle. + * @param data handle data + * @return the length and etag + * @throws IOException error reading the payload + */ + static Pair parsePartHandlePayload(byte[] data) + throws IOException { + + try(DataInputStream input = + new DataInputStream(new ByteArrayInputStream(data))) { + final String header = input.readUTF(); + if (!HEADER.equals(header)) { + throw new IOException("Wrong header string: \"" + header + "\""); + } + final long len = input.readLong(); + final String etag = input.readUTF(); + if (len <= 0) { + throw new IOException("Negative length"); + } + return Pair.of(len, etag); + } + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 46ca65c203..a85a87f7f4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -219,6 +219,10 @@ private CompleteMultipartUploadResult finalizeMultipartUpload( List partETags, long length, Retried retrying) throws IOException { + if (partETags.isEmpty()) { + throw new IOException( + "No upload parts in multipart upload to " + destKey); + } return invoker.retry("Completing multipart commit", destKey, true, retrying, diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory similarity index 100% rename from hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory rename to hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java new file mode 100644 index 0000000000..d28f39bafd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.s3a; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; + +import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE; + +/** + * Test MultipartUploader with S3A. + */ +public class ITestS3AContractMultipartUploader extends + AbstractContractMultipartUploaderTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class); + + private int partitionSize; + + /** + * S3 requires a minimum part size of 5MB (except the last part). + * @return 5MB + */ + @Override + protected int partSizeInBytes() { + return partitionSize; + } + + @Override + protected int getTestPayloadCount() { + return 3; + } + + @Override + public S3AFileSystem getFileSystem() { + return (S3AFileSystem) super.getFileSystem(); + } + + /** + * Create a configuration, possibly patching in S3Guard options. + * @return a configuration + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + maybeEnableS3Guard(conf); + return conf; + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void setup() throws Exception { + super.setup(); + Configuration conf = getContract().getConf(); + boolean enabled = getTestPropertyBool( + conf, + KEY_SCALE_TESTS_ENABLED, + DEFAULT_SCALE_TESTS_ENABLED); + assume("Scale test disabled: to enable set property " + + KEY_SCALE_TESTS_ENABLED, + enabled); + partitionSize = (int) getTestPropertyBytes(conf, + KEY_HUGE_PARTITION_SIZE, + DEFAULT_HUGE_PARTITION_SIZE); + } + + /** + * Extend superclass teardown with actions to help clean up the S3 store, + * including aborting uploads under the test path. + */ + @Override + public void teardown() throws Exception { + Path teardown = path("teardown").getParent(); + S3AFileSystem fs = getFileSystem(); + WriteOperationHelper helper = fs.getWriteOperationHelper(); + try { + LOG.info("Teardown: aborting outstanding uploads under {}", teardown); + int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown)); + LOG.info("Found {} incomplete uploads", count); + } catch (IOException e) { + LOG.warn("IOE in teardown", e); + } + super.teardown(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 0f7b418c1e..ce2a98ecb2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -105,6 +105,11 @@ public interface S3ATestConstants { */ String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize"; + /** + * Size of partitions to upload: {@value}. + */ + String DEFAULT_HUGE_PARTITION_SIZE = "8M"; + /** * The default huge size is small —full 5GB+ scale tests are something * to run in long test runs on EC2 VMs. {@value}. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java new file mode 100644 index 0000000000..35d0460526 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; +import java.io.IOException; + +import org.junit.Test; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*; +import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test multipart upload support methods and classes. + */ +public class TestS3AMultipartUploaderSupport extends HadoopTestBase { + + @Test + public void testRoundTrip() throws Throwable { + Pair result = roundTrip("tag", 1); + assertEquals("tag", result.getRight()); + assertEquals(1, result.getLeft().longValue()); + } + + @Test + public void testRoundTrip2() throws Throwable { + long len = 1L + Integer.MAX_VALUE; + Pair result = roundTrip("11223344", + len); + assertEquals("11223344", result.getRight()); + assertEquals(len, result.getLeft().longValue()); + } + + @Test + public void testNoEtag() throws Throwable { + intercept(IllegalArgumentException.class, + () -> buildPartHandlePayload("", 1)); + } + + @Test + public void testNoLen() throws Throwable { + intercept(IllegalArgumentException.class, + () -> buildPartHandlePayload("tag", 0)); + } + + @Test + public void testBadPayload() throws Throwable { + intercept(EOFException.class, + () -> parsePartHandlePayload(new byte[0])); + } + + @Test + public void testBadHeader() throws Throwable { + byte[] bytes = buildPartHandlePayload("tag", 1); + bytes[2]='f'; + intercept(IOException.class, "header", + () -> parsePartHandlePayload(bytes)); + } + + private Pair roundTrip(final String tag, final long len) throws IOException { + byte[] bytes = buildPartHandlePayload(tag, len); + return parsePartHandlePayload(bytes); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java index 4df3912c0a..55e4dc717a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java @@ -83,7 +83,9 @@ protected List listPendingUploadsToCommit( commit.setDestinationKey(key); commit.setUri("s3a://" + BUCKET + "/" + key); commit.setUploadId(UUID.randomUUID().toString()); - commit.setEtags(new ArrayList<>()); + ArrayList etags = new ArrayList<>(); + etags.add("tag1"); + commit.setEtags(etags); pending.add(commit); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 02236eba44..88a19d574c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -64,7 +64,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger( AbstractSTestS3AHugeFiles.class); public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB; - public static final String DEFAULT_PARTITION_SIZE = "8M"; + private Path scaleTestDir; private Path hugefile; private Path hugefileRenamed; @@ -101,7 +101,7 @@ protected Configuration createScaleConfiguration() { Configuration conf = super.createScaleConfiguration(); partitionSize = (int) getTestPropertyBytes(conf, KEY_HUGE_PARTITION_SIZE, - DEFAULT_PARTITION_SIZE); + DEFAULT_HUGE_PARTITION_SIZE); assertTrue("Partition size too small: " + partitionSize, partitionSize > MULTIPART_MIN_SIZE); conf.setLong(SOCKET_SEND_BUFFER, _1MB); diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml index fe0af663fd..ec4c54ae39 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml @@ -107,6 +107,11 @@ true + + fs.contract.supports-multipartuploader + true + + fs.contract.supports-unix-permissions false