HDFS-13713. Add specification of Multipart Upload API to FS specification, with contract tests.

Contributed by Ewan Higgs and Steve Loughran.
This commit is contained in:
Ewan Higgs 2018-11-29 15:11:07 +00:00 committed by Steve Loughran
parent b71cc7f33e
commit c1d24f8483
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 877 additions and 179 deletions

View File

@ -19,21 +19,23 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.fs.Path.mergePaths; import static org.apache.hadoop.fs.Path.mergePaths;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/** /**
* A MultipartUploader that uses the basic FileSystem commands. * A MultipartUploader that uses the basic FileSystem commands.
@ -70,7 +72,8 @@ public UploadHandle initialize(Path filePath) throws IOException {
public PartHandle putPart(Path filePath, InputStream inputStream, public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes) int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException { throws IOException {
checkPutArguments(filePath, inputStream, partNumber, uploadId,
lengthInBytes);
byte[] uploadIdByteArray = uploadId.toByteArray(); byte[] uploadIdByteArray = uploadId.toByteArray();
checkUploadId(uploadIdByteArray); checkUploadId(uploadIdByteArray);
Path collectorPath = new Path(new String(uploadIdByteArray, 0, Path collectorPath = new Path(new String(uploadIdByteArray, 0,
@ -82,16 +85,17 @@ public PartHandle putPart(Path filePath, InputStream inputStream,
fs.createFile(partPath).build()) { fs.createFile(partPath).build()) {
IOUtils.copy(inputStream, fsDataOutputStream, 4096); IOUtils.copy(inputStream, fsDataOutputStream, 4096);
} finally { } finally {
org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream); cleanupWithLogger(LOG, inputStream);
} }
return BBPartHandle.from(ByteBuffer.wrap( return BBPartHandle.from(ByteBuffer.wrap(
partPath.toString().getBytes(Charsets.UTF_8))); partPath.toString().getBytes(Charsets.UTF_8)));
} }
private Path createCollectorPath(Path filePath) { private Path createCollectorPath(Path filePath) {
String uuid = UUID.randomUUID().toString();
return mergePaths(filePath.getParent(), return mergePaths(filePath.getParent(),
mergePaths(new Path(filePath.getName().split("\\.")[0]), mergePaths(new Path(filePath.getName().split("\\.")[0]),
mergePaths(new Path("_multipart"), mergePaths(new Path("_multipart_" + uuid),
new Path(Path.SEPARATOR)))); new Path(Path.SEPARATOR))));
} }
@ -110,21 +114,16 @@ private long totalPartsLen(List<Path> partHandles) throws IOException {
@Override @Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE @SuppressWarnings("deprecation") // rename w/ OVERWRITE
public PathHandle complete(Path filePath, public PathHandle complete(Path filePath, Map<Integer, PartHandle> handleMap,
List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId) UploadHandle multipartUploadId) throws IOException {
throws IOException {
checkUploadId(multipartUploadId.toByteArray()); checkUploadId(multipartUploadId.toByteArray());
if (handles.isEmpty()) { checkPartHandles(handleMap);
throw new IOException("Empty upload"); List<Map.Entry<Integer, PartHandle>> handles =
} new ArrayList<>(handleMap.entrySet());
// If destination already exists, we believe we already completed it. handles.sort(Comparator.comparingInt(Map.Entry::getKey));
if (fs.exists(filePath)) {
return getPathHandle(filePath);
}
handles.sort(Comparator.comparing(Pair::getKey));
List<Path> partHandles = handles List<Path> partHandles = handles
.stream() .stream()
.map(pair -> { .map(pair -> {
@ -134,7 +133,10 @@ public PathHandle complete(Path filePath,
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
Path collectorPath = createCollectorPath(filePath); byte[] uploadIdByteArray = multipartUploadId.toByteArray();
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
boolean emptyFile = totalPartsLen(partHandles) == 0; boolean emptyFile = totalPartsLen(partHandles) == 0;
if (emptyFile) { if (emptyFile) {
fs.create(filePath).close(); fs.create(filePath).close();

View File

@ -17,39 +17,44 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.Map;
import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import static com.google.common.base.Preconditions.checkArgument;
/** /**
* MultipartUploader is an interface for copying files multipart and across * MultipartUploader is an interface for copying files multipart and across
* multiple nodes. Users should: * multiple nodes. Users should:
* <ol> * <ol>
* <li>Initialize an upload</li> * <li>Initialize an upload.</li>
* <li>Upload parts in any order</li> * <li>Upload parts in any order.</li>
* <li>Complete the upload in order to have it materialize in the destination * <li>Complete the upload in order to have it materialize in the destination
* FS</li> * FS.</li>
* </ol> * </ol>
*
* Implementers should make sure that the complete function should make sure
* that 'complete' will reorder parts if the destination FS doesn't already
* do it for them.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public abstract class MultipartUploader { public abstract class MultipartUploader implements Closeable {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(MultipartUploader.class); LoggerFactory.getLogger(MultipartUploader.class);
/**
* Perform any cleanup.
* The upload is not required to support any operations after this.
* @throws IOException problems on close.
*/
@Override
public void close() throws IOException {
}
/** /**
* Initialize a multipart upload. * Initialize a multipart upload.
* @param filePath Target path for upload. * @param filePath Target path for upload.
@ -59,8 +64,8 @@ public abstract class MultipartUploader {
public abstract UploadHandle initialize(Path filePath) throws IOException; public abstract UploadHandle initialize(Path filePath) throws IOException;
/** /**
* Put part as part of a multipart upload. It should be possible to have * Put part as part of a multipart upload.
* parts uploaded in any order (or in parallel). * It is possible to have parts uploaded in any order (or in parallel).
* @param filePath Target path for upload (same as {@link #initialize(Path)}). * @param filePath Target path for upload (same as {@link #initialize(Path)}).
* @param inputStream Data for this part. Implementations MUST close this * @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data. * stream after reading in the data.
@ -77,15 +82,15 @@ public abstract PartHandle putPart(Path filePath, InputStream inputStream,
/** /**
* Complete a multipart upload. * Complete a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}. * @param filePath Target path for upload (same as {@link #initialize(Path)}.
* @param handles non-empty list of identifiers with associated part numbers * @param handles non-empty map of part number to part handle.
* from {@link #putPart(Path, InputStream, int, UploadHandle, long)}. * from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
* Depending on the backend, the list order may be significant.
* @param multipartUploadId Identifier from {@link #initialize(Path)}. * @param multipartUploadId Identifier from {@link #initialize(Path)}.
* @return unique PathHandle identifier for the uploaded file. * @return unique PathHandle identifier for the uploaded file.
* @throws IOException IO failure or the handle list is empty. * @throws IOException IO failure
*/ */
public abstract PathHandle complete(Path filePath, public abstract PathHandle complete(Path filePath,
List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId) Map<Integer, PartHandle> handles,
UploadHandle multipartUploadId)
throws IOException; throws IOException;
/** /**
@ -98,13 +103,52 @@ public abstract void abort(Path filePath, UploadHandle multipartUploadId)
throws IOException; throws IOException;
/** /**
* Utility method to validate uploadIDs * Utility method to validate uploadIDs.
* @param uploadId * @param uploadId Upload ID
* @throws IllegalArgumentException * @throws IllegalArgumentException invalid ID
*/ */
protected void checkUploadId(byte[] uploadId) protected void checkUploadId(byte[] uploadId)
throws IllegalArgumentException { throws IllegalArgumentException {
Preconditions.checkArgument(uploadId.length > 0, checkArgument(uploadId != null, "null uploadId");
checkArgument(uploadId.length > 0,
"Empty UploadId is not valid"); "Empty UploadId is not valid");
} }
/**
* Utility method to validate partHandles.
* @param partHandles handles
* @throws IllegalArgumentException if the parts are invalid
*/
protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
checkArgument(!partHandles.isEmpty(),
"Empty upload");
partHandles.keySet()
.stream()
.forEach(key ->
checkArgument(key > 0,
"Invalid part handle index %s", key));
}
/**
* Check all the arguments to the
* {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation.
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
* @param partNumber Index of the part relative to others.
* @param uploadId Identifier from {@link #initialize(Path)}.
* @param lengthInBytes Target length to read from the stream.
* @throws IllegalArgumentException invalid argument
*/
protected void checkPutArguments(Path filePath,
InputStream inputStream,
int partNumber,
UploadHandle uploadId,
long lengthInBytes) throws IllegalArgumentException {
checkArgument(filePath != null, "null filePath");
checkArgument(inputStream != null, "null inputStream");
checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
checkArgument(uploadId != null, "null uploadId");
checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
}
} }

View File

@ -52,6 +52,13 @@ public abstract class MultipartUploaderFactory {
} }
} }
/**
* Get the multipart loader for a specific filesystem.
* @param fs filesystem
* @param conf configuration
* @return an uploader, or null if one was found.
* @throws IOException failure during the creation process.
*/
public static MultipartUploader get(FileSystem fs, Configuration conf) public static MultipartUploader get(FileSystem fs, Configuration conf)
throws IOException { throws IOException {
MultipartUploader mpu = null; MultipartUploader mpu = null;

View File

@ -36,3 +36,4 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html) 1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
2. [Testing with the Filesystem specification](testing.html) 2. [Testing with the Filesystem specification](testing.html)
2. [Extending the specification and its tests](extending.html) 2. [Extending the specification and its tests](extending.html)
1. [Uploading a file using Multiple Parts](multipartuploader.html)

View File

@ -0,0 +1,235 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- ============================================================= -->
<!-- CLASS: MultipartUploader -->
<!-- ============================================================= -->
# class `org.apache.hadoop.fs.MultipartUploader`
<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
The abstract `MultipartUploader` class is the original class to upload a file
using multiple parts to Hadoop-supported filesystems. The benefits of a
multipart upload is that the file can be uploaded from multiple clients or
processes in parallel and the results will not be visible to other clients until
the `complete` function is called.
When implemented by an object store, uploaded data may incur storage charges,
even before it is visible in the filesystems. Users of this API must be diligent
and always perform best-effort attempts to complete or abort the upload.
## Invariants
All the requirements of a valid MultipartUploader are considered implicit
econditions and postconditions:
all operations on a valid MultipartUploader MUST result in a new
MultipartUploader that is also valid.
The operations of a single multipart upload may take place across different
instance of a multipart uploader, across different processes and hosts.
It is therefore a requirement that:
1. All state needed to upload a part, complete an upload or abort an upload
must be contained within or retrievable from an upload handle.
1. If an upload handle is marshalled to another process, then, if the
receiving process has the correct permissions, it may participate in the
upload, by uploading one or more parts, by completing an upload, and/or by
aborting the upload.
## Concurrency
Multiple processes may upload parts of a multipart upload simultaneously.
If a call is made to `initialize(path)` to a destination where an active
upload is in progress, implementations MUST perform one of the two operations.
* Reject the call as a duplicate.
* Permit both to proceed, with the final output of the file being
that of _exactly one of the two uploads_.
Which upload succeeds is undefined. Users must not expect consistent
behavior across filesystems, across filesystem instances *or even
across different requests.
If a multipart upload is completed or aborted while a part upload is in progress,
the in-progress upload, if it has not completed, must not be included in
the final file, in whole or in part. Implementations SHOULD raise an error
in the `putPart()` operation.
## Model
A File System which supports Multipart Uploads extends the existing model
`(Directories, Files, Symlinks)` to one of `(Directories, Files, Symlinks, Uploads)`
`Uploads` of type `Map[UploadHandle -> Map[PartHandle -> UploadPart]`.
The `Uploads` element of the state tuple is a map of all active uploads.
```python
Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`
```
An UploadHandle is a non-empty list of bytes.
```python
UploadHandle: List[byte]
len(UploadHandle) > 0
```
Clients *MUST* treat this as opaque. What is core to this features design is that the handle is valid from
across clients: the handle may be serialized on host `hostA`, deserialized on `hostB` and still used
to extend or complete the upload.
```python
UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])
```
Similarly, the `PartHandle` type is also a non-empty list of opaque bytes, again, marshallable between hosts.
```python
PartHandle: List[byte]
```
It is implicit that each `UploadHandle` in `FS.Uploads` is unique.
Similarly, each `PartHandle` in the map of `[PartHandle -> UploadPart]` must also be unique.
1. There is no requirement that Part Handles are unique across uploads.
1. There is no requirement that Upload Handles are unique over time.
However, if Part Handles are rapidly recycled, there is a risk that the nominally
idempotent operation `abort(FS, uploadHandle)` could unintentionally cancel a
successor operation which used the same Upload Handle.
## State Changing Operations
### `UploadHandle initialize(Path path)`
Initialized a Multipart Upload, returning an upload handle for use in
subsequent operations.
#### Preconditions
```python
if path == "/" : raise IOException
if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
```
If a filesystem does not support concurrent uploads to a destination,
then the following precondition is added
```python
if path in values(FS.Uploads) raise PathExistsException, IOException
```
#### Postconditions
The outcome of this operation is that the filesystem state is updated with a new
active upload, with a new handle, this handle being returned to the caller.
```python
handle' = UploadHandle where not handle' in keys(FS.Uploads)
FS' = FS where FS'.Uploads(handle') == {}
result = handle'
```
### `PartHandle putPart(Path path, InputStream inputStream, int partNumber, UploadHandle uploadHandle, long lengthInBytes)`
Upload a part for the multipart upload.
#### Preconditions
```python
uploadHandle in keys(FS.Uploads)
partNumber >= 1
lengthInBytes >= 0
len(inputStream) >= lengthInBytes
```
#### Postconditions
```python
data' = inputStream(0..lengthInBytes)
partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts)
FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
result = partHandle'
```
The data is stored in the filesystem, pending completion.
### `PathHandle complete(Path path, Map<Integer, PartHandle> parts, UploadHandle multipartUploadId)`
Complete the multipart upload.
A Filesystem may enforce a minimum size of each part, excluding the last part uploaded.
If a part is out of this range, an `IOException` MUST be raised.
#### Preconditions
```python
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
FS.Uploads(uploadHandle).path == path
if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
parts.size() > 0
```
If there are handles in the MPU which aren't included in the map, then the omitted
parts will not be a part of the resulting file. It is up to the implementation
of the MultipartUploader to make sure the leftover parts are cleaned up.
In the case of backing stores that support directories (local filesystem, HDFS,
etc), if, at the point of completion, there is now a directory at the
destination then a `PathIsDirectoryException` or other `IOException` must be thrown.
#### Postconditions
```python
UploadData' == ordered concatention of all data in the map of parts, ordered by key
exists(FS', path') and result = PathHandle(path')
FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
```
The PathHandle is returned by the complete operation so subsequent operations
will be able to identify that the data has not changed in the meantime.
The order of parts in the uploaded by file is that of the natural order of
parts: part 1 is ahead of part 2, etc.
### `void abort(Path path, UploadHandle multipartUploadId)`
Abort a multipart upload. The handle becomes invalid and not subject to reuse.
#### Preconditions
```python
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
```
#### Postconditions
The upload handle is no longer known.
```python
FS' = FS where not uploadHandle in keys(FS'.uploads)
```
A subsequent call to `abort()` with the same handle will fail, unless
the handle has been recycled.

View File

@ -23,15 +23,19 @@
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.HashMap;
import java.util.List; import java.util.Map;
import java.util.Random;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBUploadHandle; import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -43,11 +47,64 @@
import org.apache.hadoop.fs.UploadHandle; import org.apache.hadoop.fs.UploadHandle;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
public abstract class AbstractContractMultipartUploaderTest extends public abstract class AbstractContractMultipartUploaderTest extends
AbstractFSContractTestBase { AbstractFSContractTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
/**
* Size of very small uploads.
* Enough to be non empty, not big enough to cause delays on uploads.
*/
protected static final int SMALL_FILE = 100;
private MultipartUploader mpu;
private MultipartUploader mpu2;
private final Random random = new Random();
private UploadHandle activeUpload;
private Path activeUploadPath;
protected String getMethodName() {
return methodName.getMethodName();
}
@Override
public void setup() throws Exception {
super.setup();
Configuration conf = getContract().getConf();
mpu = MultipartUploaderFactory.get(getFileSystem(), conf);
mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf);
}
@Override
public void teardown() throws Exception {
if (mpu!= null && activeUpload != null) {
try {
mpu.abort(activeUploadPath, activeUpload);
} catch (FileNotFoundException ignored) {
/* this is fine */
} catch (Exception e) {
LOG.info("in teardown", e);
}
}
cleanupWithLogger(LOG, mpu, mpu2);
super.teardown();
}
/**
* Get a test path based on the method name.
* @return a path to use in the test
* @throws IOException failure to build the path name up.
*/
protected Path methodPath() throws IOException {
return path(getMethodName());
}
/** /**
* The payload is the part number repeated for the length of the part. * The payload is the part number repeated for the length of the part.
* This makes checking the correctness of the upload straightforward. * This makes checking the correctness of the upload straightforward.
@ -55,9 +112,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
* @return the bytes to upload. * @return the bytes to upload.
*/ */
private byte[] generatePayload(int partNumber) { private byte[] generatePayload(int partNumber) {
int sizeInBytes = partSizeInBytes(); return generatePayload(partNumber, partSizeInBytes());
ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes); }
for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
/**
* Generate a payload of a given size; part number is used
* for all the values.
* @param partNumber part number
* @param size size in bytes
* @return the bytes to upload.
*/
private byte[] generatePayload(final int partNumber, final int size) {
ByteBuffer buffer = ByteBuffer.allocate(size);
for (int i=0; i < size /(Integer.SIZE/Byte.SIZE); ++i) {
buffer.putInt(partNumber); buffer.putInt(partNumber);
} }
return buffer.array(); return buffer.array();
@ -70,11 +137,14 @@ private byte[] generatePayload(int partNumber) {
* @throws IOException failure to read or digest the file. * @throws IOException failure to read or digest the file.
*/ */
protected byte[] digest(Path path) throws IOException { protected byte[] digest(Path path) throws IOException {
FileSystem fs = getFileSystem(); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try (InputStream in = fs.open(path)) { try (InputStream in = getFileSystem().open(path)) {
byte[] fdData = IOUtils.toByteArray(in); byte[] fdData = IOUtils.toByteArray(in);
MessageDigest newDigest = DigestUtils.getMd5Digest(); MessageDigest newDigest = DigestUtils.getMd5Digest();
return newDigest.digest(fdData); byte[] digest = newDigest.digest(fdData);
return digest;
} finally {
timer.end("Download and digest of path %s", path);
} }
} }
@ -92,75 +162,231 @@ protected int getTestPayloadCount() {
return 10; return 10;
} }
/**
* How long in milliseconds for propagation of
* store changes, including update/delete/list
* to be everywhere.
* If 0: the FS is consistent.
* @return a time in milliseconds.
*/
protected int timeToBecomeConsistentMillis() {
return 0;
}
/**
* Does a call to finalize an upload (either complete or abort) consume the
* uploadID immediately or is it reaped at a later point in time?
* @return true if the uploadID will be consumed immediately (and no longer
* resuable).
*/
protected abstract boolean finalizeConsumesUploadIdImmediately();
/**
* Does the store support concurrent uploads to the same destination path?
* @return true if concurrent uploads are supported.
*/
protected abstract boolean supportsConcurrentUploadsToSamePath();
/**
* Pick a multipart uploader from the index value.
* @param index index of upload
* @return an uploader
*/
protected MultipartUploader mpu(int index) {
return (index % 2 == 0) ? mpu : mpu2;
}
/**
* Pick a multipart uploader at random.
* @return an uploader
*/
protected MultipartUploader randomMpu() {
return mpu(random.nextInt(10));
}
/** /**
* Assert that a multipart upload is successful. * Assert that a multipart upload is successful.
* @throws Exception failure * @throws Exception failure
*/ */
@Test @Test
public void testSingleUpload() throws Exception { public void testSingleUpload() throws Exception {
FileSystem fs = getFileSystem(); Path file = methodPath();
Path file = path("testSingleUpload"); UploadHandle uploadHandle = initializeUpload(file);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); Map<Integer, PartHandle> partHandles = new HashMap<>();
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
MessageDigest origDigest = DigestUtils.getMd5Digest(); MessageDigest origDigest = DigestUtils.getMd5Digest();
byte[] payload = generatePayload(1); int size = SMALL_FILE;
byte[] payload = generatePayload(1, size);
origDigest.update(payload); origDigest.update(payload);
InputStream is = new ByteArrayInputStream(payload); PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle, partHandles.put(1, partHandle);
payload.length); PathHandle fd = completeUpload(file, uploadHandle, partHandles,
partHandles.add(Pair.of(1, partHandle));
PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
origDigest, origDigest,
payload.length); size);
// Complete is idempotent if (finalizeConsumesUploadIdImmediately()) {
PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle); intercept(FileNotFoundException.class,
assertArrayEquals("Path handles differ", fd.toByteArray(), () -> mpu.complete(file, partHandles, uploadHandle));
fd2.toByteArray()); } else {
PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
assertArrayEquals("Path handles differ", fd.toByteArray(),
fd2.toByteArray());
}
} }
private PathHandle completeUpload(final Path file, /**
final MultipartUploader mpu, * Initialize an upload.
* This saves the path and upload handle as the active
* upload, for aborting in teardown
* @param dest destination
* @return the handle
* @throws IOException failure to initialize
*/
protected UploadHandle initializeUpload(final Path dest) throws IOException {
activeUploadPath = dest;
activeUpload = randomMpu().initialize(dest);
return activeUpload;
}
/**
* Generate then upload a part.
* @param file destination
* @param uploadHandle handle
* @param index index of part
* @param origDigest digest to build up. May be null
* @return the part handle
* @throws IOException IO failure.
*/
protected PartHandle buildAndPutPart(
final Path file,
final UploadHandle uploadHandle, final UploadHandle uploadHandle,
final List<Pair<Integer, PartHandle>> partHandles, final int index,
final MessageDigest origDigest) throws IOException {
byte[] payload = generatePayload(index);
if (origDigest != null) {
origDigest.update(payload);
}
return putPart(file, uploadHandle, index, payload);
}
/**
* Put a part.
* The entire byte array is uploaded.
* @param file destination
* @param uploadHandle handle
* @param index index of part
* @param payload byte array of payload
* @return the part handle
* @throws IOException IO failure.
*/
protected PartHandle putPart(final Path file,
final UploadHandle uploadHandle,
final int index,
final byte[] payload) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PartHandle partHandle = mpu(index)
.putPart(file,
new ByteArrayInputStream(payload),
index,
uploadHandle,
payload.length);
timer.end("Uploaded part %s", index);
LOG.info("Upload bandwidth {} MB/s",
timer.bandwidthDescription(payload.length));
return partHandle;
}
/**
* Complete an upload with the active MPU instance.
* @param file destination
* @param uploadHandle handle
* @param partHandles map of handles
* @param origDigest digest of source data (may be null)
* @param expectedLength expected length of result.
* @return the path handle from the upload.
* @throws IOException IO failure
*/
private PathHandle completeUpload(final Path file,
final UploadHandle uploadHandle,
final Map<Integer, PartHandle> partHandles,
final MessageDigest origDigest, final MessageDigest origDigest,
final int expectedLength) throws IOException { final int expectedLength) throws IOException {
PathHandle fd = mpu.complete(file, partHandles, uploadHandle); PathHandle fd = complete(file, uploadHandle, partHandles);
FileStatus status = verifyPathExists(getFileSystem(), FileStatus status = verifyPathExists(getFileSystem(),
"Completed file", file); "Completed file", file);
assertEquals("length of " + status, assertEquals("length of " + status,
expectedLength, status.getLen()); expectedLength, status.getLen());
if (origDigest != null) {
verifyContents(file, origDigest, expectedLength);
}
return fd;
}
/**
* Verify the contents of a file.
* @param file path
* @param origDigest digest
* @param expectedLength expected length (for logging B/W)
* @throws IOException IO failure
*/
protected void verifyContents(final Path file,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
assertArrayEquals("digest of source and " + file assertArrayEquals("digest of source and " + file
+ " differ", + " differ",
origDigest.digest(), digest(file)); origDigest.digest(), digest(file));
timer2.end("Completed digest", file);
LOG.info("Download bandwidth {} MB/s",
timer2.bandwidthDescription(expectedLength));
}
/**
* Perform the inner complete without verification.
* @param file destination path
* @param uploadHandle upload handle
* @param partHandles map of parts
* @return the path handle from the upload.
* @throws IOException IO failure
*/
private PathHandle complete(final Path file,
final UploadHandle uploadHandle,
final Map<Integer, PartHandle> partHandles) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle);
timer.end("Completed upload to %s", file);
return fd; return fd;
} }
/**
* Abort an upload.
* @param file path
* @param uploadHandle handle
* @throws IOException failure
*/
private void abortUpload(final Path file, UploadHandle uploadHandle)
throws IOException {
randomMpu().abort(file, uploadHandle);
}
/** /**
* Assert that a multipart upload is successful. * Assert that a multipart upload is successful.
* @throws Exception failure * @throws Exception failure
*/ */
@Test @Test
public void testMultipartUpload() throws Exception { public void testMultipartUpload() throws Exception {
FileSystem fs = getFileSystem(); Path file = methodPath();
Path file = path("testMultipartUpload"); UploadHandle uploadHandle = initializeUpload(file);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); Map<Integer, PartHandle> partHandles = new HashMap<>();
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
MessageDigest origDigest = DigestUtils.getMd5Digest(); MessageDigest origDigest = DigestUtils.getMd5Digest();
final int payloadCount = getTestPayloadCount(); final int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) { for (int i = 1; i <= payloadCount; ++i) {
byte[] payload = generatePayload(i); PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
origDigest.update(payload); origDigest);
InputStream is = new ByteArrayInputStream(payload); partHandles.put(i, partHandle);
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
payload.length);
partHandles.add(Pair.of(i, partHandle));
} }
completeUpload(file, mpu, uploadHandle, partHandles, origDigest, completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes()); payloadCount * partSizeInBytes());
} }
@ -173,17 +399,33 @@ public void testMultipartUpload() throws Exception {
public void testMultipartUploadEmptyPart() throws Exception { public void testMultipartUploadEmptyPart() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
Path file = path("testMultipartUpload"); Path file = path("testMultipartUpload");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); try (MultipartUploader uploader =
UploadHandle uploadHandle = mpu.initialize(file); MultipartUploaderFactory.get(fs, null)) {
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); UploadHandle uploadHandle = uploader.initialize(file);
MessageDigest origDigest = DigestUtils.getMd5Digest();
byte[] payload = new byte[0]; Map<Integer, PartHandle> partHandles = new HashMap<>();
origDigest.update(payload); MessageDigest origDigest = DigestUtils.getMd5Digest();
InputStream is = new ByteArrayInputStream(payload); byte[] payload = new byte[0];
PartHandle partHandle = mpu.putPart(file, is, 0, uploadHandle, origDigest.update(payload);
payload.length); InputStream is = new ByteArrayInputStream(payload);
partHandles.add(Pair.of(0, partHandle)); PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle,
completeUpload(file, mpu, uploadHandle, partHandles, origDigest, 0); payload.length);
partHandles.put(1, partHandle);
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
}
}
/**
* Assert that a multipart upload is successful.
* @throws Exception failure
*/
@Test
public void testUploadEmptyBlock() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
completeUpload(file, uploadHandle, partHandles, null, 0);
} }
/** /**
@ -192,11 +434,9 @@ public void testMultipartUploadEmptyPart() throws Exception {
*/ */
@Test @Test
public void testMultipartUploadReverseOrder() throws Exception { public void testMultipartUploadReverseOrder() throws Exception {
FileSystem fs = getFileSystem(); Path file = methodPath();
Path file = path("testMultipartUploadReverseOrder"); UploadHandle uploadHandle = initializeUpload(file);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); Map<Integer, PartHandle> partHandles = new HashMap<>();
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
MessageDigest origDigest = DigestUtils.getMd5Digest(); MessageDigest origDigest = DigestUtils.getMd5Digest();
final int payloadCount = getTestPayloadCount(); final int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) { for (int i = 1; i <= payloadCount; ++i) {
@ -204,13 +444,9 @@ public void testMultipartUploadReverseOrder() throws Exception {
origDigest.update(payload); origDigest.update(payload);
} }
for (int i = payloadCount; i > 0; --i) { for (int i = payloadCount; i > 0; --i) {
byte[] payload = generatePayload(i); partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
payload.length);
partHandles.add(Pair.of(i, partHandle));
} }
completeUpload(file, mpu, uploadHandle, partHandles, origDigest, completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes()); payloadCount * partSizeInBytes());
} }
@ -222,25 +458,19 @@ public void testMultipartUploadReverseOrder() throws Exception {
public void testMultipartUploadReverseOrderNonContiguousPartNumbers() public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
throws Exception { throws Exception {
describe("Upload in reverse order and the part numbers are not contiguous"); describe("Upload in reverse order and the part numbers are not contiguous");
FileSystem fs = getFileSystem(); Path file = methodPath();
Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers"); UploadHandle uploadHandle = initializeUpload(file);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
MessageDigest origDigest = DigestUtils.getMd5Digest(); MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = 2 * getTestPayloadCount(); int payloadCount = 2 * getTestPayloadCount();
for (int i = 2; i <= payloadCount; i += 2) { for (int i = 2; i <= payloadCount; i += 2) {
byte[] payload = generatePayload(i); byte[] payload = generatePayload(i);
origDigest.update(payload); origDigest.update(payload);
} }
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = payloadCount; i > 0; i -= 2) { for (int i = payloadCount; i > 0; i -= 2) {
byte[] payload = generatePayload(i); partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
payload.length);
partHandles.add(Pair.of(i, partHandle));
} }
completeUpload(file, mpu, uploadHandle, partHandles, origDigest, completeUpload(file, uploadHandle, partHandles, origDigest,
getTestPayloadCount() * partSizeInBytes()); getTestPayloadCount() * partSizeInBytes());
} }
@ -251,19 +481,14 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
@Test @Test
public void testMultipartUploadAbort() throws Exception { public void testMultipartUploadAbort() throws Exception {
describe("Upload and then abort it before completing"); describe("Upload and then abort it before completing");
FileSystem fs = getFileSystem(); Path file = methodPath();
Path file = path("testMultipartUploadAbort"); UploadHandle uploadHandle = initializeUpload(file);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); int end = 10;
UploadHandle uploadHandle = mpu.initialize(file); Map<Integer, PartHandle> partHandles = new HashMap<>();
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); for (int i = 12; i > 10; i--) {
for (int i = 20; i >= 10; --i) { partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
byte[] payload = generatePayload(i);
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
payload.length);
partHandles.add(Pair.of(i, partHandle));
} }
mpu.abort(file, uploadHandle); abortUpload(file, uploadHandle);
String contents = "ThisIsPart49\n"; String contents = "ThisIsPart49\n";
int len = contents.getBytes(Charsets.UTF_8).length; int len = contents.getBytes(Charsets.UTF_8).length;
@ -275,6 +500,15 @@ public void testMultipartUploadAbort() throws Exception {
() -> mpu.complete(file, partHandles, uploadHandle)); () -> mpu.complete(file, partHandles, uploadHandle));
assertPathDoesNotExist("Uploaded file should not exist", file); assertPathDoesNotExist("Uploaded file should not exist", file);
// A second abort should be an FileNotFoundException if the UploadHandle is
// consumed by finalization operations (complete, abort).
if (finalizeConsumesUploadIdImmediately()) {
intercept(FileNotFoundException.class,
() -> abortUpload(file, uploadHandle));
} else {
abortUpload(file, uploadHandle);
}
} }
/** /**
@ -282,13 +516,23 @@ public void testMultipartUploadAbort() throws Exception {
*/ */
@Test @Test
public void testAbortUnknownUpload() throws Exception { public void testAbortUnknownUpload() throws Exception {
FileSystem fs = getFileSystem(); Path file = methodPath();
Path file = path("testAbortUnknownUpload");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
ByteBuffer byteBuffer = ByteBuffer.wrap( ByteBuffer byteBuffer = ByteBuffer.wrap(
"invalid-handle".getBytes(Charsets.UTF_8)); "invalid-handle".getBytes(Charsets.UTF_8));
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle)); intercept(FileNotFoundException.class,
() -> abortUpload(file, uploadHandle));
}
/**
* Trying to abort with a handle of size 0 must fail.
*/
@Test
public void testAbortEmptyUpload() throws Exception {
describe("initialize upload and abort before uploading data");
Path file = methodPath();
abortUpload(file, initializeUpload(file));
assertPathDoesNotExist("Uploaded file should not exist", file);
} }
/** /**
@ -296,13 +540,10 @@ public void testAbortUnknownUpload() throws Exception {
*/ */
@Test @Test
public void testAbortEmptyUploadHandle() throws Exception { public void testAbortEmptyUploadHandle() throws Exception {
FileSystem fs = getFileSystem();
Path file = path("testAbortEmptyUpload");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
intercept(IllegalArgumentException.class, intercept(IllegalArgumentException.class,
() -> mpu.abort(file, uploadHandle)); () -> abortUpload(methodPath(), uploadHandle));
} }
/** /**
@ -311,26 +552,20 @@ public void testAbortEmptyUploadHandle() throws Exception {
@Test @Test
public void testCompleteEmptyUpload() throws Exception { public void testCompleteEmptyUpload() throws Exception {
describe("Expect an empty MPU to fail, but still be abortable"); describe("Expect an empty MPU to fail, but still be abortable");
FileSystem fs = getFileSystem(); Path dest = methodPath();
Path dest = path("testCompleteEmptyUpload"); UploadHandle handle = initializeUpload(dest);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); intercept(IllegalArgumentException.class,
UploadHandle handle = mpu.initialize(dest); () -> mpu.complete(dest, new HashMap<>(), handle));
intercept(IOException.class, abortUpload(dest, handle);
() -> mpu.complete(dest, new ArrayList<>(), handle));
mpu.abort(dest, handle);
} }
/** /**
* When we pass empty uploadID, putPart throws IllegalArgumentException. * When we pass empty uploadID, putPart throws IllegalArgumentException.
* @throws Exception
*/ */
@Test @Test
public void testPutPartEmptyUploadID() throws Exception { public void testPutPartEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when putPart uploadID is empty"); describe("Expect IllegalArgumentException when putPart uploadID is empty");
FileSystem fs = getFileSystem(); Path dest = methodPath();
Path dest = path("testCompleteEmptyUpload");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
mpu.initialize(dest);
UploadHandle emptyHandle = UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0])); BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
byte[] payload = generatePayload(1); byte[] payload = generatePayload(1);
@ -341,25 +576,123 @@ public void testPutPartEmptyUploadID() throws Exception {
/** /**
* When we pass empty uploadID, complete throws IllegalArgumentException. * When we pass empty uploadID, complete throws IllegalArgumentException.
* @throws Exception
*/ */
@Test @Test
public void testCompleteEmptyUploadID() throws Exception { public void testCompleteEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when complete uploadID is empty"); describe("Expect IllegalArgumentException when complete uploadID is empty");
FileSystem fs = getFileSystem(); Path dest = methodPath();
Path dest = path("testCompleteEmptyUpload"); UploadHandle realHandle = initializeUpload(dest);
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle realHandle = mpu.initialize(dest);
UploadHandle emptyHandle = UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0])); BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); Map<Integer, PartHandle> partHandles = new HashMap<>();
byte[] payload = generatePayload(1); PartHandle partHandle = putPart(dest, realHandle, 1,
InputStream is = new ByteArrayInputStream(payload); generatePayload(1, SMALL_FILE));
PartHandle partHandle = mpu.putPart(dest, is, 1, realHandle, partHandles.put(1, partHandle);
payload.length);
partHandles.add(Pair.of(1, partHandle));
intercept(IllegalArgumentException.class, intercept(IllegalArgumentException.class,
() -> mpu.complete(dest, partHandles, emptyHandle)); () -> mpu.complete(dest, partHandles, emptyHandle));
// and, while things are setup, attempt to complete with
// a part index of 0
partHandles.clear();
partHandles.put(0, partHandle);
intercept(IllegalArgumentException.class,
() -> mpu.complete(dest, partHandles, realHandle));
}
/**
* Assert that upon completion, a directory in the way of the file will
* result in a failure. This test only applies to backing stores with a
* concept of directories.
* @throws Exception failure
*/
@Test
public void testDirectoryInTheWay() throws Exception {
FileSystem fs = getFileSystem();
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
int size = SMALL_FILE;
PartHandle partHandle = putPart(file, uploadHandle, 1,
generatePayload(1, size));
partHandles.put(1, partHandle);
fs.mkdirs(file);
intercept(IOException.class,
() -> completeUpload(file, uploadHandle, partHandles, null,
size));
// abort should still work
abortUpload(file, uploadHandle);
}
@Test
public void testConcurrentUploads() throws Throwable {
// if the FS doesn't support concurrent uploads, this test is
// required to fail during the second initialization.
final boolean concurrent = supportsConcurrentUploadsToSamePath();
describe("testing concurrent uploads, MPU support for this is "
+ concurrent);
final FileSystem fs = getFileSystem();
final Path file = methodPath();
final int size1 = SMALL_FILE;
final int partId1 = 1;
final byte[] payload1 = generatePayload(partId1, size1);
final MessageDigest digest1 = DigestUtils.getMd5Digest();
digest1.update(payload1);
final UploadHandle upload1 = initializeUpload(file);
final Map<Integer, PartHandle> partHandles1 = new HashMap<>();
// initiate part 2
// by using a different size, it's straightforward to see which
// version is visible, before reading/digesting the contents
final int size2 = size1 * 2;
final int partId2 = 2;
final byte[] payload2 = generatePayload(partId1, size2);
final MessageDigest digest2 = DigestUtils.getMd5Digest();
digest2.update(payload2);
final UploadHandle upload2;
try {
upload2 = initializeUpload(file);
Assume.assumeTrue(
"The Filesystem is unexpectedly supporting concurrent uploads",
concurrent);
} catch (IOException e) {
if (!concurrent) {
// this is expected, so end the test
LOG.debug("Expected exception raised on concurrent uploads {}", e);
return;
} else {
throw e;
}
}
final Map<Integer, PartHandle> partHandles2 = new HashMap<>();
assertNotEquals("Upload handles match", upload1, upload2);
// put part 1
partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
// put part2
partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
// complete part u1. expect its size and digest to
// be as expected.
completeUpload(file, upload1, partHandles1, digest1, size1);
// now upload part 2.
complete(file, upload2, partHandles2);
// and await the visible length to match
eventually(timeToBecomeConsistentMillis(), 500,
() -> {
FileStatus status = fs.getFileStatus(file);
assertEquals("File length in " + status,
size2, status.getLen());
});
verifyContents(file, digest2, size2);
} }
} }

View File

@ -40,4 +40,14 @@ protected AbstractFSContract createContract(Configuration conf) {
protected int partSizeInBytes() { protected int partSizeInBytes() {
return 1024; return 1024;
} }
@Override
protected boolean finalizeConsumesUploadIdImmediately() {
return true;
}
@Override
protected boolean supportsConcurrentUploadsToSamePath() {
return true;
}
} }

View File

@ -21,6 +21,8 @@
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
@ -32,6 +34,9 @@
public class TestHDFSContractMultipartUploader extends public class TestHDFSContractMultipartUploader extends
AbstractContractMultipartUploaderTest { AbstractContractMultipartUploaderTest {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
@BeforeClass @BeforeClass
public static void createCluster() throws IOException { public static void createCluster() throws IOException {
HDFSContract.createCluster(); HDFSContract.createCluster();
@ -55,4 +60,14 @@ protected AbstractFSContract createContract(Configuration conf) {
protected int partSizeInBytes() { protected int partSizeInBytes() {
return 1024; return 1024;
} }
@Override
protected boolean finalizeConsumesUploadIdImmediately() {
return true;
}
@Override
protected boolean supportsConcurrentUploadsToSamePath() {
return true;
}
} }

View File

@ -25,7 +25,9 @@
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@ -68,10 +70,8 @@ public class S3AMultipartUploader extends MultipartUploader {
public static final String HEADER = "S3A-part01"; public static final String HEADER = "S3A-part01";
public S3AMultipartUploader(FileSystem fs, Configuration conf) { public S3AMultipartUploader(FileSystem fs, Configuration conf) {
if (!(fs instanceof S3AFileSystem)) { Preconditions.checkArgument(fs instanceof S3AFileSystem,
throw new IllegalArgumentException( "Wrong filesystem: expected S3A but got %s", fs);
"S3A MultipartUploads must use S3AFileSystem");
}
s3a = (S3AFileSystem) fs; s3a = (S3AFileSystem) fs;
} }
@ -88,6 +88,8 @@ public UploadHandle initialize(Path filePath) throws IOException {
public PartHandle putPart(Path filePath, InputStream inputStream, public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes) int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException { throws IOException {
checkPutArguments(filePath, inputStream, partNumber, uploadId,
lengthInBytes);
byte[] uploadIdBytes = uploadId.toByteArray(); byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes); checkUploadId(uploadIdBytes);
String key = s3a.pathToKey(filePath); String key = s3a.pathToKey(filePath);
@ -105,14 +107,16 @@ public PartHandle putPart(Path filePath, InputStream inputStream,
@Override @Override
public PathHandle complete(Path filePath, public PathHandle complete(Path filePath,
List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) Map<Integer, PartHandle> handleMap,
UploadHandle uploadId)
throws IOException { throws IOException {
byte[] uploadIdBytes = uploadId.toByteArray(); byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes); checkUploadId(uploadIdBytes);
if (handles.isEmpty()) {
throw new IOException("Empty upload");
}
checkPartHandles(handleMap);
List<Map.Entry<Integer, PartHandle>> handles =
new ArrayList<>(handleMap.entrySet());
handles.sort(Comparator.comparingInt(Map.Entry::getKey));
final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath); String key = s3a.pathToKey(filePath);
@ -121,11 +125,11 @@ public PathHandle complete(Path filePath,
ArrayList<PartETag> eTags = new ArrayList<>(); ArrayList<PartETag> eTags = new ArrayList<>();
eTags.ensureCapacity(handles.size()); eTags.ensureCapacity(handles.size());
long totalLength = 0; long totalLength = 0;
for (Pair<Integer, PartHandle> handle : handles) { for (Map.Entry<Integer, PartHandle> handle : handles) {
byte[] payload = handle.getRight().toByteArray(); byte[] payload = handle.getValue().toByteArray();
Pair<Long, String> result = parsePartHandlePayload(payload); Pair<Long, String> result = parsePartHandlePayload(payload);
totalLength += result.getLeft(); totalLength += result.getLeft();
eTags.add(new PartETag(handle.getLeft(), result.getRight())); eTags.add(new PartETag(handle.getKey(), result.getRight()));
} }
AtomicInteger errorCount = new AtomicInteger(0); AtomicInteger errorCount = new AtomicInteger(0);
CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries( CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
@ -172,7 +176,7 @@ static byte[] buildPartHandlePayload(String eTag, long len)
throws IOException { throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(eTag), Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
"Empty etag"); "Empty etag");
Preconditions.checkArgument(len > 0, Preconditions.checkArgument(len >= 0,
"Invalid length"); "Invalid length");
ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ByteArrayOutputStream bytes = new ByteArrayOutputStream();
@ -190,6 +194,7 @@ static byte[] buildPartHandlePayload(String eTag, long len)
* @return the length and etag * @return the length and etag
* @throws IOException error reading the payload * @throws IOException error reading the payload
*/ */
@VisibleForTesting
static Pair<Long, String> parsePartHandlePayload(byte[] data) static Pair<Long, String> parsePartHandlePayload(byte[] data)
throws IOException { throws IOException {
@ -201,7 +206,7 @@ static Pair<Long, String> parsePartHandlePayload(byte[] data)
} }
final long len = input.readLong(); final long len = input.readLong();
final String etag = input.readUTF(); final String etag = input.readUTF();
if (len <= 0) { if (len < 0) {
throw new IOException("Negative length"); throw new IOException("Negative length");
} }
return Pair.of(len, etag); return Pair.of(len, etag);

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.fs.contract.s3a; package org.apache.hadoop.fs.contract.s3a;
import java.io.IOException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -26,6 +24,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.WriteOperationHelper;
@ -35,6 +34,9 @@
/** /**
* Test MultipartUploader with S3A. * Test MultipartUploader with S3A.
* Although not an S3A Scale test subclass, it uses the -Dscale option
* to enable it, and partition size option to control the size of
* parts uploaded.
*/ */
public class ITestS3AContractMultipartUploader extends public class ITestS3AContractMultipartUploader extends
AbstractContractMultipartUploaderTest { AbstractContractMultipartUploaderTest {
@ -79,6 +81,35 @@ protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf); return new S3AContract(conf);
} }
/**
* Bigger test: use the scale timeout.
* @return the timeout for scale tests.
*/
@Override
protected int getTestTimeoutMillis() {
return SCALE_TEST_TIMEOUT_MILLIS;
}
@Override
protected boolean supportsConcurrentUploadsToSamePath() {
return true;
}
/**
* Provide a pessimistic time to become consistent.
* @return a time in milliseconds
*/
@Override
protected int timeToBecomeConsistentMillis() {
return 30 * 1000;
}
@Override
protected boolean finalizeConsumesUploadIdImmediately() {
return false;
}
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();
@ -103,14 +134,29 @@ public void setup() throws Exception {
public void teardown() throws Exception { public void teardown() throws Exception {
Path teardown = path("teardown").getParent(); Path teardown = path("teardown").getParent();
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
WriteOperationHelper helper = fs.getWriteOperationHelper(); if (fs != null) {
try { WriteOperationHelper helper = fs.getWriteOperationHelper();
LOG.info("Teardown: aborting outstanding uploads under {}", teardown); try {
int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown)); LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
LOG.info("Found {} incomplete uploads", count); int count = helper.abortMultipartUploadsUnderPath(
} catch (IOException e) { fs.pathToKey(teardown));
LOG.warn("IOE in teardown", e); LOG.info("Found {} incomplete uploads", count);
} catch (Exception e) {
LOG.warn("Exeception in teardown", e);
}
} }
super.teardown(); super.teardown();
} }
/**
* S3 has no concept of directories, so this test does not apply.
*/
public void testDirectoryInTheWay() throws Exception {
// no-op
}
@Override
public void testMultipartUploadReverseOrder() throws Exception {
ContractTestUtils.skip("skipped for speed");
}
} }

View File

@ -60,7 +60,7 @@ public void testNoEtag() throws Throwable {
@Test @Test
public void testNoLen() throws Throwable { public void testNoLen() throws Throwable {
intercept(IllegalArgumentException.class, intercept(IllegalArgumentException.class,
() -> buildPartHandlePayload("tag", 0)); () -> buildPartHandlePayload("tag", -1));
} }
@Test @Test