diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
index b57ff3dc3a..a700a9fd0b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
@@ -16,12 +16,6 @@
*/
package org.apache.hadoop.fs;
-import com.google.common.base.Charsets;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -29,13 +23,26 @@
import java.util.List;
import java.util.stream.Collectors;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import static org.apache.hadoop.fs.Path.mergePaths;
+
/**
* A MultipartUploader that uses the basic FileSystem commands.
* This is done in three stages:
- * Init - create a temp _multipart directory.
- * PutPart - copying the individual parts of the file to the temp directory.
- * Complete - use {@link FileSystem#concat} to merge the files; and then delete
- * the temp directory.
+ *
+ * - Init - create a temp {@code _multipart} directory.
+ * - PutPart - copying the individual parts of the file to the temp
+ * directory.
+ * - Complete - use {@link FileSystem#concat} to merge the files;
+ * and then delete the temp directory.
+ *
*/
public class FileSystemMultipartUploader extends MultipartUploader {
@@ -64,28 +71,44 @@ public PartHandle putPart(Path filePath, InputStream inputStream,
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
Path partPath =
- Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR),
+ mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR),
new Path(Integer.toString(partNumber) + ".part")));
- FSDataOutputStreamBuilder outputStream = fs.createFile(partPath);
- FSDataOutputStream fsDataOutputStream = outputStream.build();
- IOUtils.copy(inputStream, fsDataOutputStream, 4096);
- fsDataOutputStream.close();
+ try(FSDataOutputStream fsDataOutputStream =
+ fs.createFile(partPath).build()) {
+ IOUtils.copy(inputStream, fsDataOutputStream, 4096);
+ } finally {
+ org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream);
+ }
return BBPartHandle.from(ByteBuffer.wrap(
partPath.toString().getBytes(Charsets.UTF_8)));
}
private Path createCollectorPath(Path filePath) {
- return Path.mergePaths(filePath.getParent(),
- Path.mergePaths(new Path(filePath.getName().split("\\.")[0]),
- Path.mergePaths(new Path("_multipart"),
+ return mergePaths(filePath.getParent(),
+ mergePaths(new Path(filePath.getName().split("\\.")[0]),
+ mergePaths(new Path("_multipart"),
new Path(Path.SEPARATOR))));
}
+ private PathHandle getPathHandle(Path filePath) throws IOException {
+ FileStatus status = fs.getFileStatus(filePath);
+ return fs.getPathHandle(status);
+ }
+
@Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public PathHandle complete(Path filePath,
List> handles, UploadHandle multipartUploadId)
throws IOException {
+
+ if (handles.isEmpty()) {
+ throw new IOException("Empty upload");
+ }
+ // If destination already exists, we believe we already completed it.
+ if (fs.exists(filePath)) {
+ return getPathHandle(filePath);
+ }
+
handles.sort(Comparator.comparing(Pair::getKey));
List partHandles = handles
.stream()
@@ -97,22 +120,26 @@ public PathHandle complete(Path filePath,
.collect(Collectors.toList());
Path collectorPath = createCollectorPath(filePath);
- Path filePathInsideCollector = Path.mergePaths(collectorPath,
+ Path filePathInsideCollector = mergePaths(collectorPath,
new Path(Path.SEPARATOR + filePath.getName()));
fs.create(filePathInsideCollector).close();
fs.concat(filePathInsideCollector,
partHandles.toArray(new Path[handles.size()]));
fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
fs.delete(collectorPath, true);
- FileStatus status = fs.getFileStatus(filePath);
- return fs.getPathHandle(status);
+ return getPathHandle(filePath);
}
@Override
public void abort(Path filePath, UploadHandle uploadId) throws IOException {
byte[] uploadIdByteArray = uploadId.toByteArray();
+ Preconditions.checkArgument(uploadIdByteArray.length != 0,
+ "UploadId is empty");
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
+
+ // force a check for a file existing; raises FNFE if not found
+ fs.getFileStatus(collectorPath);
fs.delete(collectorPath, true);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
index 24a92169a2..47fd9f29b9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -21,17 +21,20 @@
import java.io.InputStream;
import java.util.List;
-import org.apache.commons.lang3.tuple.Pair;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+
/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes. Users should:
- * 1. Initialize an upload
- * 2. Upload parts in any order
- * 3. Complete the upload in order to have it materialize in the destination FS.
+ *
+ * - Initialize an upload
+ * - Upload parts in any order
+ * - Complete the upload in order to have it materialize in the destination
+ * FS
+ *
*
* Implementers should make sure that the complete function should make sure
* that 'complete' will reorder parts if the destination FS doesn't already
@@ -45,7 +48,7 @@ public abstract class MultipartUploader {
* Initialize a multipart upload.
* @param filePath Target path for upload.
* @return unique identifier associating part uploads.
- * @throws IOException
+ * @throws IOException IO failure
*/
public abstract UploadHandle initialize(Path filePath) throws IOException;
@@ -53,12 +56,13 @@ public abstract class MultipartUploader {
* Put part as part of a multipart upload. It should be possible to have
* parts uploaded in any order (or in parallel).
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
- * @param inputStream Data for this part.
+ * @param inputStream Data for this part. Implementations MUST close this
+ * stream after reading in the data.
* @param partNumber Index of the part relative to others.
* @param uploadId Identifier from {@link #initialize(Path)}.
* @param lengthInBytes Target length to read from the stream.
* @return unique PartHandle identifier for the uploaded part.
- * @throws IOException
+ * @throws IOException IO failure
*/
public abstract PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
@@ -67,12 +71,12 @@ public abstract PartHandle putPart(Path filePath, InputStream inputStream,
/**
* Complete a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
- * @param handles Identifiers with associated part numbers from
- * {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
+ * @param handles non-empty list of identifiers with associated part numbers
+ * from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
* Depending on the backend, the list order may be significant.
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
* @return unique PathHandle identifier for the uploaded file.
- * @throws IOException
+ * @throws IOException IO failure or the handle list is empty.
*/
public abstract PathHandle complete(Path filePath,
List> handles, UploadHandle multipartUploadId)
@@ -81,10 +85,10 @@ public abstract PathHandle complete(Path filePath,
/**
* Aborts a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
- * @param multipartuploadId Identifier from {@link #initialize(Path)}.
- * @throws IOException
+ * @param multipartUploadId Identifier from {@link #initialize(Path)}.
+ * @throws IOException IO failure
*/
- public abstract void abort(Path filePath, UploadHandle multipartuploadId)
+ public abstract void abort(Path filePath, UploadHandle multipartUploadId)
throws IOException;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
index df70b746cc..47ce3ab189 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
@@ -16,14 +16,14 @@
*/
package org.apache.hadoop.fs;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
import java.io.Serializable;
import java.nio.ByteBuffer;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
- * Opaque, serializable reference to an part id for multipart uploads.
+ * Opaque, serializable reference to a part id for multipart uploads.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
index 60aa6a53bf..d5304ba549 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
@@ -25,15 +25,16 @@
/**
* Opaque, serializable reference to an entity in the FileSystem. May contain
- * metadata sufficient to resolve or verify subsequent accesses indepedent of
+ * metadata sufficient to resolve or verify subsequent accesses independent of
* other modifications to the FileSystem.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
+@FunctionalInterface
public interface PathHandle extends Serializable {
/**
- * @return Serialized from in bytes.
+ * @return Serialized form in bytes.
*/
default byte[] toByteArray() {
ByteBuffer bb = bytes();
@@ -42,6 +43,10 @@ default byte[] toByteArray() {
return ret;
}
+ /**
+ * Get the bytes of this path handle.
+ * @return the bytes to get to the process completing the upload.
+ */
ByteBuffer bytes();
@Override
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
deleted file mode 100644
index f132089a9e..0000000000
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractSystemMultipartUploaderTest {
-
- abstract FileSystem getFS() throws IOException;
-
- abstract Path getBaseTestPath();
-
- @Test
- public void testMultipartUpload() throws Exception {
- FileSystem fs = getFS();
- Path file = new Path(getBaseTestPath(), "some-file");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List> partHandles = new ArrayList<>();
- StringBuilder sb = new StringBuilder();
- for (int i = 1; i <= 100; ++i) {
- String contents = "ThisIsPart" + i + "\n";
- sb.append(contents);
- int len = contents.getBytes().length;
- InputStream is = IOUtils.toInputStream(contents, "UTF-8");
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
- partHandles.add(Pair.of(i, partHandle));
- }
- PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
- byte[] fdData = IOUtils.toByteArray(fs.open(fd));
- byte[] fileData = IOUtils.toByteArray(fs.open(file));
- String readString = new String(fdData);
- assertEquals(sb.toString(), readString);
- assertArrayEquals(fdData, fileData);
- }
-
- @Test
- public void testMultipartUploadReverseOrder() throws Exception {
- FileSystem fs = getFS();
- Path file = new Path(getBaseTestPath(), "some-file");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List> partHandles = new ArrayList<>();
- StringBuilder sb = new StringBuilder();
- for (int i = 1; i <= 100; ++i) {
- String contents = "ThisIsPart" + i + "\n";
- sb.append(contents);
- }
- for (int i = 100; i > 0; --i) {
- String contents = "ThisIsPart" + i + "\n";
- int len = contents.getBytes().length;
- InputStream is = IOUtils.toInputStream(contents, "UTF-8");
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
- partHandles.add(Pair.of(i, partHandle));
- }
- PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
- byte[] fdData = IOUtils.toByteArray(fs.open(fd));
- byte[] fileData = IOUtils.toByteArray(fs.open(file));
- String readString = new String(fdData);
- assertEquals(sb.toString(), readString);
- assertArrayEquals(fdData, fileData);
- }
-
- @Test
- public void testMultipartUploadReverseOrderNoNContiguousPartNumbers()
- throws Exception {
- FileSystem fs = getFS();
- Path file = new Path(getBaseTestPath(), "some-file");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List> partHandles = new ArrayList<>();
- StringBuilder sb = new StringBuilder();
- for (int i = 2; i <= 200; i += 2) {
- String contents = "ThisIsPart" + i + "\n";
- sb.append(contents);
- }
- for (int i = 200; i > 0; i -= 2) {
- String contents = "ThisIsPart" + i + "\n";
- int len = contents.getBytes().length;
- InputStream is = IOUtils.toInputStream(contents, "UTF-8");
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
- partHandles.add(Pair.of(i, partHandle));
- }
- PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
- byte[] fdData = IOUtils.toByteArray(fs.open(fd));
- byte[] fileData = IOUtils.toByteArray(fs.open(file));
- String readString = new String(fdData);
- assertEquals(sb.toString(), readString);
- assertArrayEquals(fdData, fileData);
- }
-
- @Test
- public void testMultipartUploadAbort() throws Exception {
- FileSystem fs = getFS();
- Path file = new Path(getBaseTestPath(), "some-file");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- for (int i = 100; i >= 50; --i) {
- String contents = "ThisIsPart" + i + "\n";
- int len = contents.getBytes().length;
- InputStream is = IOUtils.toInputStream(contents, "UTF-8");
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
- }
- mpu.abort(file, uploadHandle);
-
- String contents = "ThisIsPart49\n";
- int len = contents.getBytes().length;
- InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-
- try {
- mpu.putPart(file, is, 49, uploadHandle, len);
- fail("putPart should have thrown an exception");
- } catch (IOException ok) {
- // ignore
- }
- }
-}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
new file mode 100644
index 0000000000..c0e1600d52
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import org.junit.Test;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.UploadHandle;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractMultipartUploaderTest extends
+ AbstractFSContractTestBase {
+
+ /**
+ * The payload is the part number repeated for the length of the part.
+ * This makes checking the correctness of the upload straightforward.
+ * @param partNumber part number
+ * @return the bytes to upload.
+ */
+ private byte[] generatePayload(int partNumber) {
+ int sizeInBytes = partSizeInBytes();
+ ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+ for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
+ buffer.putInt(partNumber);
+ }
+ return buffer.array();
+ }
+
+ /**
+ * Load a path, make an MD5 digest.
+ * @param path path to load
+ * @return the digest array
+ * @throws IOException failure to read or digest the file.
+ */
+ protected byte[] digest(Path path) throws IOException {
+ FileSystem fs = getFileSystem();
+ try (InputStream in = fs.open(path)) {
+ byte[] fdData = IOUtils.toByteArray(in);
+ MessageDigest newDigest = DigestUtils.getMd5Digest();
+ return newDigest.digest(fdData);
+ }
+ }
+
+ /**
+ * Get the partition size in bytes to use for each upload.
+ * @return a number > 0
+ */
+ protected abstract int partSizeInBytes();
+
+ /**
+ * Get the number of test payloads to upload.
+ * @return a number > 1
+ */
+ protected int getTestPayloadCount() {
+ return 10;
+ }
+
+ /**
+ * Assert that a multipart upload is successful.
+ * @throws Exception failure
+ */
+ @Test
+ public void testSingleUpload() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path file = path("testSingleUpload");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ UploadHandle uploadHandle = mpu.initialize(file);
+ List> partHandles = new ArrayList<>();
+ MessageDigest origDigest = DigestUtils.getMd5Digest();
+ byte[] payload = generatePayload(1);
+ origDigest.update(payload);
+ InputStream is = new ByteArrayInputStream(payload);
+ PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle,
+ payload.length);
+ partHandles.add(Pair.of(1, partHandle));
+ PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
+ origDigest,
+ payload.length);
+
+ // Complete is idempotent
+ PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
+ assertArrayEquals("Path handles differ", fd.toByteArray(),
+ fd2.toByteArray());
+ }
+
+ private PathHandle completeUpload(final Path file,
+ final MultipartUploader mpu,
+ final UploadHandle uploadHandle,
+ final List> partHandles,
+ final MessageDigest origDigest,
+ final int expectedLength) throws IOException {
+ PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+
+ FileStatus status = verifyPathExists(getFileSystem(),
+ "Completed file", file);
+ assertEquals("length of " + status,
+ expectedLength, status.getLen());
+
+ assertArrayEquals("digest of source and " + file
+ + " differ",
+ origDigest.digest(), digest(file));
+ return fd;
+ }
+
+ /**
+ * Assert that a multipart upload is successful.
+ * @throws Exception failure
+ */
+ @Test
+ public void testMultipartUpload() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path file = path("testMultipartUpload");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ UploadHandle uploadHandle = mpu.initialize(file);
+ List> partHandles = new ArrayList<>();
+ MessageDigest origDigest = DigestUtils.getMd5Digest();
+ final int payloadCount = getTestPayloadCount();
+ for (int i = 1; i <= payloadCount; ++i) {
+ byte[] payload = generatePayload(i);
+ origDigest.update(payload);
+ InputStream is = new ByteArrayInputStream(payload);
+ PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+ payload.length);
+ partHandles.add(Pair.of(i, partHandle));
+ }
+ completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+ payloadCount * partSizeInBytes());
+ }
+
+ /**
+ * Assert that a multipart upload is successful even when the parts are
+ * given in the reverse order.
+ */
+ @Test
+ public void testMultipartUploadReverseOrder() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path file = path("testMultipartUploadReverseOrder");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ UploadHandle uploadHandle = mpu.initialize(file);
+ List> partHandles = new ArrayList<>();
+ MessageDigest origDigest = DigestUtils.getMd5Digest();
+ final int payloadCount = getTestPayloadCount();
+ for (int i = 1; i <= payloadCount; ++i) {
+ byte[] payload = generatePayload(i);
+ origDigest.update(payload);
+ }
+ for (int i = payloadCount; i > 0; --i) {
+ byte[] payload = generatePayload(i);
+ InputStream is = new ByteArrayInputStream(payload);
+ PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+ payload.length);
+ partHandles.add(Pair.of(i, partHandle));
+ }
+ completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+ payloadCount * partSizeInBytes());
+ }
+
+ /**
+ * Assert that a multipart upload is successful even when the parts are
+ * given in reverse order and the part numbers are not contiguous.
+ */
+ @Test
+ public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
+ throws Exception {
+ describe("Upload in reverse order and the part numbers are not contiguous");
+ FileSystem fs = getFileSystem();
+ Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ UploadHandle uploadHandle = mpu.initialize(file);
+ List> partHandles = new ArrayList<>();
+ MessageDigest origDigest = DigestUtils.getMd5Digest();
+ int payloadCount = 2 * getTestPayloadCount();
+ for (int i = 2; i <= payloadCount; i += 2) {
+ byte[] payload = generatePayload(i);
+ origDigest.update(payload);
+ }
+ for (int i = payloadCount; i > 0; i -= 2) {
+ byte[] payload = generatePayload(i);
+ InputStream is = new ByteArrayInputStream(payload);
+ PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+ payload.length);
+ partHandles.add(Pair.of(i, partHandle));
+ }
+ completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+ getTestPayloadCount() * partSizeInBytes());
+ }
+
+ /**
+ * Assert that when we abort a multipart upload, the resulting file does
+ * not show up.
+ */
+ @Test
+ public void testMultipartUploadAbort() throws Exception {
+ describe("Upload and then abort it before completing");
+ FileSystem fs = getFileSystem();
+ Path file = path("testMultipartUploadAbort");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ UploadHandle uploadHandle = mpu.initialize(file);
+ List> partHandles = new ArrayList<>();
+ for (int i = 20; i >= 10; --i) {
+ byte[] payload = generatePayload(i);
+ InputStream is = new ByteArrayInputStream(payload);
+ PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+ payload.length);
+ partHandles.add(Pair.of(i, partHandle));
+ }
+ mpu.abort(file, uploadHandle);
+
+ String contents = "ThisIsPart49\n";
+ int len = contents.getBytes(Charsets.UTF_8).length;
+ InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+
+ intercept(IOException.class,
+ () -> mpu.putPart(file, is, 49, uploadHandle, len));
+ intercept(IOException.class,
+ () -> mpu.complete(file, partHandles, uploadHandle));
+
+ assertPathDoesNotExist("Uploaded file should not exist", file);
+ }
+
+ /**
+ * Trying to abort from an invalid handle must fail.
+ */
+ @Test
+ public void testAbortUnknownUpload() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path file = path("testAbortUnknownUpload");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(
+ "invalid-handle".getBytes(Charsets.UTF_8));
+ UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
+ intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle));
+ }
+
+ /**
+ * Trying to abort with a handle of size 0 must fail.
+ */
+ @Test
+ public void testAbortEmptyUploadHandle() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path file = path("testAbortEmptyUpload");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
+ UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
+ intercept(IllegalArgumentException.class,
+ () -> mpu.abort(file, uploadHandle));
+ }
+
+ /**
+ * When we complete with no parts provided, it must fail.
+ */
+ @Test
+ public void testCompleteEmptyUpload() throws Exception {
+ describe("Expect an empty MPU to fail, but still be abortable");
+ FileSystem fs = getFileSystem();
+ Path dest = path("testCompleteEmptyUpload");
+ MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ UploadHandle handle = mpu.initialize(dest);
+ intercept(IOException.class,
+ () -> mpu.complete(dest, new ArrayList<>(), handle));
+ mpu.abort(dest, handle);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
similarity index 53%
rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
index 21d01b6cdb..a50d2e41b1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
@@ -15,51 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.fs;
+package org.apache.hadoop.fs.contract.localfs;
import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.IOException;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Test the FileSystemMultipartUploader on local file system.
*/
-public class TestLocalFileSystemMultipartUploader
- extends AbstractSystemMultipartUploaderTest {
-
- private static FileSystem fs;
- private File tmp;
-
- @BeforeClass
- public static void init() throws IOException {
- fs = LocalFileSystem.getLocal(new Configuration());
- }
-
- @Before
- public void setup() throws IOException {
- tmp = getRandomizedTestDir();
- tmp.mkdirs();
- }
-
- @After
- public void tearDown() throws IOException {
- tmp.delete();
- }
+public class TestLocalFSContractMultipartUploader
+ extends AbstractContractMultipartUploaderTest {
@Override
- public FileSystem getFS() {
- return fs;
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new LocalFSContract(conf);
}
+ /**
+ * There is no real need to upload any particular size.
+ * @return 1 kilobyte
+ */
@Override
- public Path getBaseTestPath() {
- return new Path(tmp.getAbsolutePath());
+ protected int partSizeInBytes() {
+ return 1024;
}
-
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
deleted file mode 100644
index 96c50938b3..0000000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-public class TestHDFSMultipartUploader
- extends AbstractSystemMultipartUploaderTest {
-
- private static MiniDFSCluster cluster;
- private Path tmp;
-
- @Rule
- public TestName name = new TestName();
-
- @BeforeClass
- public static void init() throws IOException {
- HdfsConfiguration conf = new HdfsConfiguration();
- cluster = new MiniDFSCluster.Builder(conf,
- GenericTestUtils.getRandomizedTestDir())
- .numDataNodes(1)
- .build();
- cluster.waitClusterUp();
- }
-
- @AfterClass
- public static void cleanup() throws IOException {
- if (cluster != null) {
- cluster.shutdown();
- cluster = null;
- }
- }
-
- @Before
- public void setup() throws IOException {
- tmp = new Path(cluster.getFileSystem().getWorkingDirectory(),
- name.getMethodName());
- cluster.getFileSystem().mkdirs(tmp);
- }
-
- @Override
- public FileSystem getFS() throws IOException {
- return cluster.getFileSystem();
- }
-
- @Override
- public Path getBaseTestPath() {
- return tmp;
- }
-
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
new file mode 100644
index 0000000000..f3a5265de7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.hdfs;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test MultipartUploader tests on HDFS.
+ */
+public class TestHDFSContractMultipartUploader extends
+ AbstractContractMultipartUploaderTest {
+
+ @BeforeClass
+ public static void createCluster() throws IOException {
+ HDFSContract.createCluster();
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws IOException {
+ HDFSContract.destroyCluster();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HDFSContract(conf);
+ }
+
+ /**
+ * HDFS doesn't have any restriction on the part size.
+ * @return 1KB
+ */
+ @Override
+ protected int partSizeInBytes() {
+ return 1024;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
index 34c88d43f6..6a1df54bd6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
@@ -17,15 +17,26 @@
*/
package org.apache.hadoop.fs.s3a;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBPartHandle;
@@ -37,13 +48,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
/**
* MultipartUploader for S3AFileSystem. This uses the S3 multipart
@@ -53,6 +59,10 @@ public class S3AMultipartUploader extends MultipartUploader {
private final S3AFileSystem s3a;
+ /** Header for Parts: {@value}. */
+
+ public static final String HEADER = "S3A-part01";
+
public S3AMultipartUploader(FileSystem fs, Configuration conf) {
if (!(fs instanceof S3AFileSystem)) {
throw new IllegalArgumentException(
@@ -63,75 +73,72 @@ public S3AMultipartUploader(FileSystem fs, Configuration conf) {
@Override
public UploadHandle initialize(Path filePath) throws IOException {
+ final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath);
- InitiateMultipartUploadRequest request =
- new InitiateMultipartUploadRequest(s3a.getBucket(), key);
- LOG.debug("initialize request: {}", request);
- InitiateMultipartUploadResult result = s3a.initiateMultipartUpload(request);
- String uploadId = result.getUploadId();
+ String uploadId = writeHelper.initiateMultiPartUpload(key);
return BBUploadHandle.from(ByteBuffer.wrap(
uploadId.getBytes(Charsets.UTF_8)));
}
@Override
public PartHandle putPart(Path filePath, InputStream inputStream,
- int partNumber, UploadHandle uploadId, long lengthInBytes) {
- String key = s3a.pathToKey(filePath);
- UploadPartRequest request = new UploadPartRequest();
- byte[] uploadIdBytes = uploadId.toByteArray();
- request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
- Charsets.UTF_8));
- request.setInputStream(inputStream);
- request.setPartSize(lengthInBytes);
- request.setPartNumber(partNumber);
- request.setBucketName(s3a.getBucket());
- request.setKey(key);
- LOG.debug("putPart request: {}", request);
- UploadPartResult result = s3a.uploadPart(request);
- String eTag = result.getETag();
- return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8)));
- }
-
- @Override
- public PathHandle complete(Path filePath,
- List> handles, UploadHandle uploadId) {
- String key = s3a.pathToKey(filePath);
- CompleteMultipartUploadRequest request =
- new CompleteMultipartUploadRequest();
- request.setBucketName(s3a.getBucket());
- request.setKey(key);
- byte[] uploadIdBytes = uploadId.toByteArray();
- request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
- Charsets.UTF_8));
- List eTags = handles
- .stream()
- .map(handle -> {
- byte[] partEtagBytes = handle.getRight().toByteArray();
- return new PartETag(handle.getLeft(),
- new String(partEtagBytes, 0, partEtagBytes.length,
- Charsets.UTF_8));
- })
- .collect(Collectors.toList());
- request.setPartETags(eTags);
- LOG.debug("Complete request: {}", request);
- CompleteMultipartUploadResult completeMultipartUploadResult =
- s3a.getAmazonS3Client().completeMultipartUpload(request);
-
- byte[] eTag = DFSUtilClient.string2Bytes(
- completeMultipartUploadResult.getETag());
- return (PathHandle) () -> ByteBuffer.wrap(eTag);
- }
-
- @Override
- public void abort(Path filePath, UploadHandle uploadId) {
+ int partNumber, UploadHandle uploadId, long lengthInBytes)
+ throws IOException {
+ final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath);
byte[] uploadIdBytes = uploadId.toByteArray();
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
- AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a
- .getBucket(), key, uploadIdString);
- LOG.debug("Abort request: {}", request);
- s3a.getAmazonS3Client().abortMultipartUpload(request);
+ UploadPartRequest request = writeHelper.newUploadPartRequest(key,
+ uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L);
+ UploadPartResult result = writeHelper.uploadPart(request);
+ String eTag = result.getETag();
+ return BBPartHandle.from(
+ ByteBuffer.wrap(
+ buildPartHandlePayload(eTag, lengthInBytes)));
+ }
+
+ @Override
+ public PathHandle complete(Path filePath,
+ List> handles, UploadHandle uploadId)
+ throws IOException {
+ byte[] uploadIdBytes = uploadId.toByteArray();
+ checkUploadId(uploadIdBytes);
+ if (handles.isEmpty()) {
+ throw new IOException("Empty upload");
+ }
+
+ final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
+ String key = s3a.pathToKey(filePath);
+
+ String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
+ Charsets.UTF_8);
+ ArrayList eTags = new ArrayList<>();
+ eTags.ensureCapacity(handles.size());
+ long totalLength = 0;
+ for (Pair handle : handles) {
+ byte[] payload = handle.getRight().toByteArray();
+ Pair result = parsePartHandlePayload(payload);
+ totalLength += result.getLeft();
+ eTags.add(new PartETag(handle.getLeft(), result.getRight()));
+ }
+ AtomicInteger errorCount = new AtomicInteger(0);
+ CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
+ key, uploadIdStr, eTags, totalLength, errorCount);
+
+ byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
+ return (PathHandle) () -> ByteBuffer.wrap(eTag);
+ }
+
+ @Override
+ public void abort(Path filePath, UploadHandle uploadId) throws IOException {
+ final byte[] uploadIdBytes = uploadId.toByteArray();
+ checkUploadId(uploadIdBytes);
+ final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
+ String key = s3a.pathToKey(filePath);
+ String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
+ Charsets.UTF_8);
+ writeHelper.abortMultipartCommit(key, uploadIdString);
}
/**
@@ -141,10 +148,64 @@ public static class Factory extends MultipartUploaderFactory {
@Override
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
- if (fs.getScheme().equals("s3a")) {
+ if (FS_S3A.equals(fs.getScheme())) {
return new S3AMultipartUploader(fs, conf);
}
return null;
}
}
+
+ private void checkUploadId(byte[] uploadId) throws IllegalArgumentException {
+ Preconditions.checkArgument(uploadId.length > 0,
+ "Empty UploadId is not valid");
+ }
+
+ /**
+ * Build the payload for marshalling.
+ * @param eTag upload etag
+ * @param len length
+ * @return a byte array to marshall.
+ * @throws IOException error writing the payload
+ */
+ @VisibleForTesting
+ static byte[] buildPartHandlePayload(String eTag, long len)
+ throws IOException {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
+ "Empty etag");
+ Preconditions.checkArgument(len > 0,
+ "Invalid length");
+
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ try(DataOutputStream output = new DataOutputStream(bytes)) {
+ output.writeUTF(HEADER);
+ output.writeLong(len);
+ output.writeUTF(eTag);
+ }
+ return bytes.toByteArray();
+ }
+
+ /**
+ * Parse the payload marshalled as a part handle.
+ * @param data handle data
+ * @return the length and etag
+ * @throws IOException error reading the payload
+ */
+ static Pair parsePartHandlePayload(byte[] data)
+ throws IOException {
+
+ try(DataInputStream input =
+ new DataInputStream(new ByteArrayInputStream(data))) {
+ final String header = input.readUTF();
+ if (!HEADER.equals(header)) {
+ throw new IOException("Wrong header string: \"" + header + "\"");
+ }
+ final long len = input.readLong();
+ final String etag = input.readUTF();
+ if (len <= 0) {
+ throw new IOException("Negative length");
+ }
+ return Pair.of(len, etag);
+ }
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 46ca65c203..a85a87f7f4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -219,6 +219,10 @@ private CompleteMultipartUploadResult finalizeMultipartUpload(
List partETags,
long length,
Retried retrying) throws IOException {
+ if (partETags.isEmpty()) {
+ throw new IOException(
+ "No upload parts in multipart upload to " + destKey);
+ }
return invoker.retry("Completing multipart commit", destKey,
true,
retrying,
diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
similarity index 100%
rename from hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
rename to hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
new file mode 100644
index 0000000000..d28f39bafd
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
+
+/**
+ * Test MultipartUploader with S3A.
+ */
+public class ITestS3AContractMultipartUploader extends
+ AbstractContractMultipartUploaderTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class);
+
+ private int partitionSize;
+
+ /**
+ * S3 requires a minimum part size of 5MB (except the last part).
+ * @return 5MB
+ */
+ @Override
+ protected int partSizeInBytes() {
+ return partitionSize;
+ }
+
+ @Override
+ protected int getTestPayloadCount() {
+ return 3;
+ }
+
+ @Override
+ public S3AFileSystem getFileSystem() {
+ return (S3AFileSystem) super.getFileSystem();
+ }
+
+ /**
+ * Create a configuration, possibly patching in S3Guard options.
+ * @return a configuration
+ */
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ maybeEnableS3Guard(conf);
+ return conf;
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new S3AContract(conf);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ Configuration conf = getContract().getConf();
+ boolean enabled = getTestPropertyBool(
+ conf,
+ KEY_SCALE_TESTS_ENABLED,
+ DEFAULT_SCALE_TESTS_ENABLED);
+ assume("Scale test disabled: to enable set property " +
+ KEY_SCALE_TESTS_ENABLED,
+ enabled);
+ partitionSize = (int) getTestPropertyBytes(conf,
+ KEY_HUGE_PARTITION_SIZE,
+ DEFAULT_HUGE_PARTITION_SIZE);
+ }
+
+ /**
+ * Extend superclass teardown with actions to help clean up the S3 store,
+ * including aborting uploads under the test path.
+ */
+ @Override
+ public void teardown() throws Exception {
+ Path teardown = path("teardown").getParent();
+ S3AFileSystem fs = getFileSystem();
+ WriteOperationHelper helper = fs.getWriteOperationHelper();
+ try {
+ LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
+ int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown));
+ LOG.info("Found {} incomplete uploads", count);
+ } catch (IOException e) {
+ LOG.warn("IOE in teardown", e);
+ }
+ super.teardown();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 0f7b418c1e..ce2a98ecb2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -105,6 +105,11 @@ public interface S3ATestConstants {
*/
String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize";
+ /**
+ * Size of partitions to upload: {@value}.
+ */
+ String DEFAULT_HUGE_PARTITION_SIZE = "8M";
+
/**
* The default huge size is small —full 5GB+ scale tests are something
* to run in long test runs on EC2 VMs. {@value}.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
new file mode 100644
index 0000000000..35d0460526
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*;
+import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test multipart upload support methods and classes.
+ */
+public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
+
+ @Test
+ public void testRoundTrip() throws Throwable {
+ Pair result = roundTrip("tag", 1);
+ assertEquals("tag", result.getRight());
+ assertEquals(1, result.getLeft().longValue());
+ }
+
+ @Test
+ public void testRoundTrip2() throws Throwable {
+ long len = 1L + Integer.MAX_VALUE;
+ Pair result = roundTrip("11223344",
+ len);
+ assertEquals("11223344", result.getRight());
+ assertEquals(len, result.getLeft().longValue());
+ }
+
+ @Test
+ public void testNoEtag() throws Throwable {
+ intercept(IllegalArgumentException.class,
+ () -> buildPartHandlePayload("", 1));
+ }
+
+ @Test
+ public void testNoLen() throws Throwable {
+ intercept(IllegalArgumentException.class,
+ () -> buildPartHandlePayload("tag", 0));
+ }
+
+ @Test
+ public void testBadPayload() throws Throwable {
+ intercept(EOFException.class,
+ () -> parsePartHandlePayload(new byte[0]));
+ }
+
+ @Test
+ public void testBadHeader() throws Throwable {
+ byte[] bytes = buildPartHandlePayload("tag", 1);
+ bytes[2]='f';
+ intercept(IOException.class, "header",
+ () -> parsePartHandlePayload(bytes));
+ }
+
+ private Pair roundTrip(final String tag, final long len) throws IOException {
+ byte[] bytes = buildPartHandlePayload(tag, len);
+ return parsePartHandlePayload(bytes);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
index 4df3912c0a..55e4dc717a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -83,7 +83,9 @@ protected List listPendingUploadsToCommit(
commit.setDestinationKey(key);
commit.setUri("s3a://" + BUCKET + "/" + key);
commit.setUploadId(UUID.randomUUID().toString());
- commit.setEtags(new ArrayList<>());
+ ArrayList etags = new ArrayList<>();
+ etags.add("tag1");
+ commit.setEtags(etags);
pending.add(commit);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 02236eba44..88a19d574c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -64,7 +64,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractSTestS3AHugeFiles.class);
public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
- public static final String DEFAULT_PARTITION_SIZE = "8M";
+
private Path scaleTestDir;
private Path hugefile;
private Path hugefileRenamed;
@@ -101,7 +101,7 @@ protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
partitionSize = (int) getTestPropertyBytes(conf,
KEY_HUGE_PARTITION_SIZE,
- DEFAULT_PARTITION_SIZE);
+ DEFAULT_HUGE_PARTITION_SIZE);
assertTrue("Partition size too small: " + partitionSize,
partitionSize > MULTIPART_MIN_SIZE);
conf.setLong(SOCKET_SEND_BUFFER, _1MB);
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
index fe0af663fd..ec4c54ae39 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
+++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
@@ -107,6 +107,11 @@
true
+
+ fs.contract.supports-multipartuploader
+ true
+
+
fs.contract.supports-unix-permissions
false