HADOOP-16906. Abortable (#2684)

Adds an Abortable.abort() interface for streams to enable output streams to be terminated; this
is implemented by the S3A connector's output stream. It allows for commit protocols
to be implemented which commit/abort work by writing to the final destination and
using the abort() call to cancel any write which is not intended to be committed.
Consult the specification document for information about the interface and its use.

Contributed by Jungtaek Lim and Steve Loughran.

Change-Id: I7fcc25e9dd8c10ce6c29f383529f3a2642a201ae
This commit is contained in:
Steve Loughran 2021-02-11 17:37:20 +00:00
parent 73f9083f2a
commit 4423a7e736
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
22 changed files with 665 additions and 39 deletions

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Abort data being written to a stream, so that close() does
* not write the data. It is implemented by output streams in
* some object stores, and passed through {@link FSDataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface Abortable {
/**
* Abort the active operation without the output becoming visible.
*
* This is to provide ability to cancel the write on stream; once
* a stream is aborted, the write MUST NOT become visible.
*
* @throws UnsupportedOperationException if the operation is not supported.
* @return the result.
*/
AbortableResult abort();
/**
* Interface for the result of aborts; allows subclasses to extend
* (IOStatistics etc) or for future enhancements if ever needed.
*/
interface AbortableResult {
/**
* Was the stream already closed/aborted?
* @return true if a close/abort operation had already
* taken place.
*/
boolean alreadyClosed();
/**
* Any exception caught during cleanup operations,
* exceptions whose raising/catching does not change
* the semantics of the abort.
* @return an exception or null.
*/
IOException anyCleanupException();
}
}

View File

@ -139,4 +139,11 @@ public final class CommonPathCapabilities {
public static final String FS_MULTIPART_UPLOADER =
"fs.capability.multipart.uploader";
/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* Value: {@value}.
*/
public static final String ABORTABLE_STREAM =
"fs.capability.outputstream.abortable";
}

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
IOStatisticsSource, Abortable {
private final OutputStream wrappedStream;
private static class PositionCache extends FilterOutputStream {
@ -168,4 +168,21 @@ public class FSDataOutputStream extends DataOutputStream
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}
/**
* Invoke {@code abort()} on the wrapped stream if it
* is Abortable, otherwise raise an
* {@code UnsupportedOperationException}.
* @throws UnsupportedOperationException if not available.
* @return the result.
*/
@Override
public AbortableResult abort() {
if (wrappedStream instanceof Abortable) {
return ((Abortable) wrappedStream).abort();
} else {
throw new UnsupportedOperationException(
FSExceptionMessages.ABORTABLE_UNSUPPORTED);
}
}
}

View File

@ -51,4 +51,10 @@ public class FSExceptionMessages {
public static final String PERMISSION_DENIED_BY_STICKY_BIT =
"Permission denied by sticky bit";
/**
* A call was made to abort(), but it is not supported.
*/
public static final String ABORTABLE_UNSUPPORTED =
"Abortable.abort() is not supported";
}

View File

@ -80,6 +80,13 @@ public interface StreamCapabilities {
*/
String IOSTATISTICS = "iostatistics";
/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability
* {@link CommonPathCapabilities#ABORTABLE_STREAM}.
*/
String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM;
/**
* Capabilities that a stream can support and be queried for.
*/

View File

@ -37,6 +37,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving
public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_ABORT = "op_abort";
/** {@value}. */
public static final String OP_APPEND = "op_append";

View File

@ -0,0 +1,186 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- ============================================================= -->
<!-- CLASS: FileSystem -->
<!-- ============================================================= -->
# interface `org.apache.hadoop.fs.Abortable`
<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
Abort the active operation such that the output does not become
manifest.
Specifically, if supported on an [output stream](outputstream.html),
a successful `abort()` MUST guarantee that the stream will not be made visible in the `close()`
operation.
```java
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface Abortable {
/**
* Abort the active operation without the output becoming visible.
*
* This is to provide ability to cancel the write on stream; once
* a stream is aborted, the write MUST NOT become visible.
*
* @throws UnsupportedOperationException if the operation is not supported.
* @return the result.
*/
AbortableResult abort();
/**
* Interface for the result of aborts; allows subclasses to extend
* (IOStatistics etc) or for future enhancements if ever needed.
*/
interface AbortableResult {
/**
* Was the stream already closed/aborted?
* @return true if a close/abort operation had already
* taken place.
*/
boolean alreadyClosed();
/**
* Any exception caught during cleanup operations,
* exceptions whose raising/catching does not change
* the semantics of the abort.
* @return an exception or null.
*/
IOException anyCleanupException();
}
}
```
## Method `abort()`
Aborts the ongoing operation such that no output SHALL become visible
when the operation is completed.
Unless and until other File System classes implement `Abortable`, the
interface is specified purely for output streams.
## Method `abort()` on an output stream
`Abortable.abort()` MUST only be supported on output streams
whose output is only made visible when `close()` is called,
for example. output streams returned by the S3A FileSystem.
## Preconditions
The stream MUST implement `Abortable` and `StreamCapabilities`.
```python
if unsupported:
throw UnsupportedException
if not isOpen(stream):
no-op
StreamCapabilities.hasCapability("fs.capability.outputstream.abortable") == True
```
## Postconditions
After `abort()` returns, the filesystem MUST be unchanged:
```
FS' = FS
```
A successful `abort()` operation MUST guarantee that
when the stream` close()` is invoked no output shall be manifest.
* The stream MUST retry any remote calls needed to force the abort outcome.
* If any file was present at the destination path, it MUST remain unchanged.
Strictly then:
> if `Abortable.abort()` does not raise `UnsupportedOperationException`
> then returns, then it guarantees that the write SHALL NOT become visible
> and that any existing data in the filesystem at the destination path SHALL
> continue to be available.
1. Calls to `write()` methods MUST fail.
1. Calls to `flush()` MUST be no-ops (applications sometimes call this on closed streams)
1. Subsequent calls to `abort()` MUST be no-ops.
1. `close()` MUST NOT manifest the file, and MUST NOT raise an exception
That is, the postconditions of `close()` becomes:
```
FS' = FS
```
### Cleanup
* If temporary data is stored in the local filesystem or in the store's upload
infrastructure then this MAY be cleaned up; best-effort is expected here.
* The stream SHOULD NOT retry cleanup operations; any failure there MUST be
caught and added to `AbortResult`
#### Returned `AbortResult`
The `AbortResult` value returned is primarily for testing and logging.
`alreadyClosed()`: MUST return `true` if the write had already been aborted or closed;
`anyCleanupException();`: SHOULD return any IOException raised during any optional
cleanup operations.
### Thread safety and atomicity
Output streams themselves aren't formally required to be thread safe,
but as applications do sometimes assume they are, this call MUST be thread safe.
## Path/Stream capability "fs.capability.outputstream.abortable"
An application MUST be able to verify that a stream supports the `Abortable.abort()`
operation without actually calling it. This is done through the `StreamCapabilities`
interface.
1. If a stream instance supports `Abortable` then it MUST return `true`
in the probe `hasCapability("fs.capability.outputstream.abortable")`
1. If a stream instance does not support `Abortable` then it MUST return `false`
in the probe `hasCapability("fs.capability.outputstream.abortable")`
That is: if a stream declares its support for the feature, a call to `abort()`
SHALL meet the defined semantics of the operation.
FileSystem/FileContext implementations SHOULD declare support similarly, to
allow for applications to probe for the feature in the destination directory/path.
If a filesystem supports `Abortable` under a path `P` then it SHOULD return `true` to
`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`
This is to allow applications to verify that the store supports the feature.
If a filesystem does not support `Abortable` under a path `P` then it MUST
return `false` to
`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`

View File

@ -33,6 +33,7 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [Model](model.html)
1. [FileSystem class](filesystem.html)
1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
1. [Abortable](abortable.html)
1. [FSDataInputStream class](fsdatainputstream.html)
1. [PathCapabilities interface](pathcapabilities.html)
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)

View File

@ -893,7 +893,7 @@ Object store streams MAY buffer the entire stream's output
until the final `close()` operation triggers a single `PUT` of the data
and materialization of the final output.
This significantly change's their behaviour compared to that of
This significantly changes their behaviour compared to that of
POSIX filesystems and that specified in this document.
#### Visibility of newly created objects
@ -961,6 +961,10 @@ is present: the act of instantiating the object, while potentially exhibiting
create inconsistency, is atomic. Applications may be able to use that fact
to their advantage.
The [Abortable](abortable.html) interface exposes this ability to abort an output
stream before its data is made visible, so can be used for checkpointing and similar
operations.
## <a name="implementors"></a> Implementors notes.
### Always implement `Syncable` -even if just to throw `UnsupportedOperationException`

View File

@ -233,8 +233,8 @@ public class ContractTestUtils extends Assert {
public static void verifyFileContents(FileSystem fs,
Path path,
byte[] original) throws IOException {
assertIsFile(fs, path);
FileStatus stat = fs.getFileStatus(path);
assertIsFile(path, stat);
String statText = stat.toString();
assertEquals("wrong length " + statText, original.length, stat.getLen());
byte[] bytes = readDataset(fs, path, original.length);

View File

@ -25,6 +25,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@ -38,7 +39,6 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@ -49,11 +49,14 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Abortable;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@ -61,7 +64,9 @@ import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@ -79,7 +84,7 @@ import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3ABlockOutputStream extends OutputStream implements
StreamCapabilities, IOStatisticsSource, Syncable {
StreamCapabilities, IOStatisticsSource, Syncable, Abortable {
private static final Logger LOG =
LoggerFactory.getLogger(S3ABlockOutputStream.class);
@ -171,7 +176,9 @@ class S3ABlockOutputStream extends OutputStream implements
this.key = key;
this.blockFactory = blockFactory;
this.blockSize = (int) blockSize;
this.statistics = statistics;
this.statistics = statistics != null
? statistics
: EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
// test instantiations may not provide statistics;
this.iostatistics = statistics != null
? statistics.getIOStatistics()
@ -421,19 +428,107 @@ class S3ABlockOutputStream extends OutputStream implements
// if this happened during a multipart upload, abort the
// operation, so as to not leave (billable) data
// pending on the bucket
if (multiPartUpload != null) {
multiPartUpload.abort();
}
maybeAbortMultipart();
writeOperationHelper.writeFailed(ioe);
throw ioe;
} finally {
cleanupWithLogger(LOG, block, blockFactory);
cleanupOnClose();
}
// Note end of write. This does not change the state of the remote FS.
writeOperationHelper.writeSuccessful(bytes);
}
/**
* Final operations in close/abort of stream.
* Shuts down block factory, closes any active block,
* and pushes out statistics.
*/
private synchronized void cleanupOnClose() {
cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
LOG.debug("Statistics: {}", statistics);
cleanupWithLogger(LOG, statistics);
clearActiveBlock();
}
// Note end of write. This does not change the state of the remote FS.
writeOperationHelper.writeSuccessful(bytes);
/**
* Best effort abort of the multipart upload; sets
* the field to null afterwards.
* @return any exception caught during the operation.
*/
private synchronized IOException maybeAbortMultipart() {
if (multiPartUpload != null) {
final IOException ioe = multiPartUpload.abort();
multiPartUpload = null;
return ioe;
} else {
return null;
}
}
/**
* Abort any active uploads, enter closed state.
* @return the outcome
*/
@Override
public AbortableResult abort() {
if (closed.getAndSet(true)) {
// already closed
LOG.debug("Ignoring abort() as stream is already closed");
return new AbortableResultImpl(true, null);
}
try (DurationTracker d =
statistics.trackDuration(INVOCATION_ABORT.getSymbol())) {
return new AbortableResultImpl(false, maybeAbortMultipart());
} finally {
cleanupOnClose();
}
}
/**
* Abortable result.
*/
private static final class AbortableResultImpl implements AbortableResult {
/**
* Had the stream already been closed/aborted?
*/
private final boolean alreadyClosed;
/**
* Was any exception raised during non-essential
* cleanup actions (i.e. MPU abort)?
*/
private final IOException anyCleanupException;
/**
* Constructor.
* @param alreadyClosed Had the stream already been closed/aborted?
* @param anyCleanupException Was any exception raised during cleanup?
*/
private AbortableResultImpl(final boolean alreadyClosed,
final IOException anyCleanupException) {
this.alreadyClosed = alreadyClosed;
this.anyCleanupException = anyCleanupException;
}
@Override
public boolean alreadyClosed() {
return alreadyClosed;
}
@Override
public IOException anyCleanupException() {
return anyCleanupException;
}
@Override
public String toString() {
return new StringJoiner(", ",
AbortableResultImpl.class.getSimpleName() + "[", "]")
.add("alreadyClosed=" + alreadyClosed)
.add("anyCleanupException=" + anyCleanupException)
.toString();
}
}
/**
@ -548,6 +643,10 @@ class S3ABlockOutputStream extends OutputStream implements
case StreamCapabilities.IOSTATISTICS:
return true;
// S3A supports abort.
case StreamCapabilities.ABORTABLE_STREAM:
return true;
default:
return false;
}
@ -756,35 +855,43 @@ class S3ABlockOutputStream extends OutputStream implements
maybeRethrowUploadFailure();
AtomicInteger errorCount = new AtomicInteger(0);
try {
trackDurationOfInvocation(statistics,
MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> {
writeOperationHelper.completeMPUwithRetries(key,
uploadId,
partETags,
bytesSubmitted,
errorCount);
});
} finally {
statistics.exceptionInMultipartComplete(errorCount.get());
}
}
/**
* Abort a multi-part upload. Retries are attempted on failures.
* Abort a multi-part upload. Retries are not attempted on failures.
* IOExceptions are caught; this is expected to be run as a cleanup process.
* @return any caught exception.
*/
public void abort() {
private IOException abort() {
LOG.debug("Aborting upload");
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
cancelAllActiveFutures();
try {
trackDurationOfInvocation(statistics,
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
cancelAllActiveFutures();
writeOperationHelper.abortMultipartUpload(key, uploadId,
(text, e, r, i) -> statistics.exceptionInMultipartAbort());
false, null);
});
return null;
} catch (IOException e) {
// this point is only reached if the operation failed more than
// the allowed retry count
LOG.warn("Unable to abort multipart upload,"
+ " you may need to purge uploaded parts", e);
statistics.exceptionInMultipartAbort();
return e;
}
}
}
/**

View File

@ -4724,6 +4724,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
ETAG_CHECKSUM_ENABLED_DEFAULT);
case CommonPathCapabilities.ABORTABLE_STREAM:
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
return true;

View File

@ -1350,7 +1350,11 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
.withGauges(
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
.withDurationTracking(ACTION_EXECUTOR_ACQUIRED)
.withDurationTracking(
ACTION_EXECUTOR_ACQUIRED,
INVOCATION_ABORT.getSymbol(),
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(),
MULTIPART_UPLOAD_COMPLETED.getSymbol())
.build();
setIOStatistics(st);
// these are extracted to avoid lookups on heavily used counters.

View File

@ -88,6 +88,11 @@ public enum Statistic {
TYPE_COUNTER),
IGNORED_ERRORS("ignored_errors", "Errors caught and ignored",
TYPE_COUNTER),
INVOCATION_ABORT(
StoreStatisticNames.OP_ABORT,
"Calls of abort()",
TYPE_DURATION),
INVOCATION_COPY_FROM_LOCAL_FILE(
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
"Calls of copyFromLocalFile()",

View File

@ -336,14 +336,17 @@ public class WriteOperationHelper implements WriteOperations {
* Abort a multipart upload operation.
* @param destKey destination key of the upload
* @param uploadId multipart operation Id
* @param shouldRetry should failures trigger a retry?
* @param retrying callback invoked on every retry
* @throws IOException failure to abort
* @throws FileNotFoundException if the abort ID is unknown
*/
@Retries.RetryTranslated
public void abortMultipartUpload(String destKey, String uploadId,
Retried retrying)
boolean shouldRetry, Retried retrying)
throws IOException {
if (shouldRetry) {
// retrying option
invoker.retry("Aborting multipart upload ID " + uploadId,
destKey,
true,
@ -351,6 +354,14 @@ public class WriteOperationHelper implements WriteOperations {
() -> owner.abortMultipartUpload(
destKey,
uploadId));
} else {
// single pass attempt.
once("Aborting multipart upload ID " + uploadId,
destKey,
() -> owner.abortMultipartUpload(
destKey,
uploadId));
}
}
/**
@ -401,7 +412,7 @@ public class WriteOperationHelper implements WriteOperations {
@Retries.RetryTranslated
public void abortMultipartCommit(String destKey, String uploadId)
throws IOException {
abortMultipartUpload(destKey, uploadId, invoker.getRetryCallback());
abortMultipartUpload(destKey, uploadId, true, invoker.getRetryCallback());
}
/**

View File

@ -154,13 +154,14 @@ public interface WriteOperations {
* Abort a multipart upload operation.
* @param destKey destination key of the upload
* @param uploadId multipart operation Id
* @param shouldRetry should failures trigger a retry?
* @param retrying callback invoked on every retry
* @throws IOException failure to abort
* @throws FileNotFoundException if the abort ID is unknown
*/
@Retries.RetryTranslated
void abortMultipartUpload(String destKey, String uploadId,
Invoker.Retried retrying)
boolean shouldRetry, Invoker.Retried retrying)
throws IOException;
/**

View File

@ -1622,7 +1622,7 @@ public abstract class S3GuardTool extends Configured implements Tool,
if (mode == Mode.ABORT) {
getFilesystem().getWriteOperationHelper()
.abortMultipartUpload(upload.getKey(), upload.getUploadId(),
LOG_EVENT);
true, LOG_EVENT);
}
}
if (mode != Mode.EXPECT || verbose) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
@ -32,7 +33,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
/**
* Tests small file upload functionality for
@ -155,4 +159,51 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
markAndResetDatablock(createFactory(getFileSystem()));
}
@Test
public void testAbortAfterWrite() throws Throwable {
describe("Verify abort after a write does not create a file");
Path dest = path(getMethodName());
FileSystem fs = getFileSystem();
ContractTestUtils.assertHasPathCapabilities(fs, dest, ABORTABLE_STREAM);
FSDataOutputStream stream = fs.create(dest, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try {
ContractTestUtils.assertCapabilities(stream,
new String[]{ABORTABLE_STREAM},
null);
stream.write(data);
assertCompleteAbort(stream.abort());
// second attempt is harmless
assertNoopAbort(stream.abort());
// the path should not exist
ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
} finally {
IOUtils.closeStream(stream);
// check the path doesn't exist "after" closing stream
ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
}
// and it can be called on the stream after being closed.
assertNoopAbort(stream.abort());
}
/**
* A stream which was abort()ed after being close()d for a
* successful write will return indicating nothing happened.
*/
@Test
public void testAbortAfterCloseIsHarmless() throws Throwable {
describe("Verify abort on a closed stream is harmless "
+ "and that the result indicates that nothing happened");
Path dest = path(getMethodName());
FileSystem fs = getFileSystem();
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try (FSDataOutputStream stream = fs.create(dest, true)) {
stream.write(data);
assertCompleteAbort(stream.abort());
stream.close();
assertNoopAbort(stream.abort());
}
}
}

View File

@ -88,7 +88,7 @@ public final class MultipartTestUtils {
while (uploads.hasNext()) {
MultipartUpload upload = uploads.next();
fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(),
upload.getUploadId(), LOG_EVENT);
upload.getUploadId(), true, LOG_EVENT);
LOG.debug("Cleaning up upload: {} {}", upload.getKey(),
truncatedUploadId(upload.getUploadId()));
}

View File

@ -82,4 +82,30 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
() -> woh.newUploadPartRequest(key,
"uploadId", 50000, 1024, inputStream, null, 0L));
}
static class StreamClosedException extends IOException {}
@Test
public void testStreamClosedAfterAbort() throws Exception {
stream.abort();
// This verification replaces testing various operations after calling
// abort: after calling abort, stream is closed like calling close().
intercept(IOException.class, () -> stream.checkOpen());
// check that calling write() will call checkOpen() and throws exception
doThrow(new StreamClosedException()).when(stream).checkOpen();
intercept(StreamClosedException.class,
() -> stream.write(new byte[] {'a', 'b', 'c'}));
}
@Test
public void testCallingCloseAfterCallingAbort() throws Exception {
stream.abort();
// This shouldn't throw IOException like calling close() multiple times.
// This will ensure abort() can be called with try-with-resource.
stream.close();
}
}

View File

@ -25,19 +25,33 @@ import org.junit.Test;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_ABORT;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@ -118,4 +132,83 @@ public class ITestS3AMultipartUploadSizeLimits extends S3AScaleTestBase {
describedAs("commit abort count")
.isEqualTo(initial + 1);
}
@Test
public void testAbortAfterTwoPartUpload() throws Throwable {
Path file = path(getMethodName());
byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
S3AFileSystem fs = getFileSystem();
FSDataOutputStream stream = fs.create(file, true);
try {
stream.write(data);
// From testTwoPartUpload() we know closing stream will finalize uploads
// and materialize the path. Here we call abort() to abort the upload,
// and ensure the path is NOT available. (uploads are aborted)
assertCompleteAbort(stream.abort());
// the path should not exist
assertPathDoesNotExist("upload must not have completed", file);
} finally {
IOUtils.closeStream(stream);
// check the path doesn't exist "after" closing stream
assertPathDoesNotExist("upload must not have completed", file);
}
verifyStreamWasAborted(fs, stream);
// a second abort is a no-op
assertNoopAbort(stream.abort());
}
@Test
public void testAbortWhenOverwritingAFile() throws Throwable {
Path file = path(getMethodName());
S3AFileSystem fs = getFileSystem();
// write the original data
byte[] smallData = writeTextFile(fs, file, "original", true);
// now attempt a multipart upload
byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
FSDataOutputStream stream = fs.create(file, true);
try {
ContractTestUtils.assertCapabilities(stream,
new String[]{ABORTABLE_STREAM},
null);
stream.write(data);
assertCompleteAbort(stream.abort());
verifyFileContents(fs, file, smallData);
} finally {
IOUtils.closeStream(stream);
}
verifyFileContents(fs, file, smallData);
verifyStreamWasAborted(fs, stream);
}
/**
* Check up on the IOStatistics of the FS and stream to verify that
* a stream was aborted -both in invocations of abort() and
* that the multipart upload itself was aborted.
* @param fs filesystem
* @param stream stream
*/
private void verifyStreamWasAborted(final S3AFileSystem fs,
final FSDataOutputStream stream) {
// check the stream
final IOStatistics iostats = stream.getIOStatistics();
final String sstr = ioStatisticsToPrettyString(iostats);
LOG.info("IOStatistics for stream: {}", sstr);
verifyStatisticCounterValue(iostats, INVOCATION_ABORT.getSymbol(), 1);
verifyStatisticCounterValue(iostats,
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), 1);
// now the FS.
final IOStatistics fsIostats = fs.getIOStatistics();
assertThatStatisticCounter(fsIostats, INVOCATION_ABORT.getSymbol())
.isGreaterThanOrEqualTo(1);
}
}

View File

@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Abortable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -149,4 +151,31 @@ public final class ExtraAssertions {
throw e;
}
}
/**
* Assert that an abort was completely successful in that it
* was not a no-op and no exception was raised during
* cleanup.
* @param result result to assert over
*/
public static void assertCompleteAbort(
Abortable.AbortableResult result) {
Assertions.assertThat(result)
.describedAs("Abort operation result %s", result)
.matches(r -> !r.alreadyClosed())
.matches(r -> r.anyCleanupException() == null);
}
/**
* Assert that an abort was a no-op as the
* stream had already closed/aborted.
* @param result result to assert over
*/
public static void assertNoopAbort(
Abortable.AbortableResult result) {
Assertions.assertThat(result)
.describedAs("Abort operation result %s", result)
.matches(r -> r.alreadyClosed());
}
}