HADOOP-16906. Abortable (#2684)
Adds an Abortable.abort() interface for streams to enable output streams to be terminated; this is implemented by the S3A connector's output stream. It allows for commit protocols to be implemented which commit/abort work by writing to the final destination and using the abort() call to cancel any write which is not intended to be committed. Consult the specification document for information about the interface and its use. Contributed by Jungtaek Lim and Steve Loughran.
This commit is contained in:
parent
98ca6afd17
commit
78905d7e3f
@ -0,0 +1,67 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Abort data being written to a stream, so that close() does
|
||||
* not write the data. It is implemented by output streams in
|
||||
* some object stores, and passed through {@link FSDataOutputStream}.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface Abortable {
|
||||
|
||||
/**
|
||||
* Abort the active operation without the output becoming visible.
|
||||
*
|
||||
* This is to provide ability to cancel the write on stream; once
|
||||
* a stream is aborted, the write MUST NOT become visible.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the operation is not supported.
|
||||
* @return the result.
|
||||
*/
|
||||
AbortableResult abort();
|
||||
|
||||
/**
|
||||
* Interface for the result of aborts; allows subclasses to extend
|
||||
* (IOStatistics etc) or for future enhancements if ever needed.
|
||||
*/
|
||||
interface AbortableResult {
|
||||
|
||||
/**
|
||||
* Was the stream already closed/aborted?
|
||||
* @return true if a close/abort operation had already
|
||||
* taken place.
|
||||
*/
|
||||
boolean alreadyClosed();
|
||||
|
||||
/**
|
||||
* Any exception caught during cleanup operations,
|
||||
* exceptions whose raising/catching does not change
|
||||
* the semantics of the abort.
|
||||
* @return an exception or null.
|
||||
*/
|
||||
IOException anyCleanupException();
|
||||
}
|
||||
}
|
@ -139,4 +139,11 @@ private CommonPathCapabilities() {
|
||||
public static final String FS_MULTIPART_UPLOADER =
|
||||
"fs.capability.multipart.uploader";
|
||||
|
||||
|
||||
/**
|
||||
* Stream abort() capability implemented by {@link Abortable#abort()}.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String ABORTABLE_STREAM =
|
||||
"fs.capability.outputstream.abortable";
|
||||
}
|
||||
|
@ -35,7 +35,7 @@
|
||||
@InterfaceStability.Stable
|
||||
public class FSDataOutputStream extends DataOutputStream
|
||||
implements Syncable, CanSetDropBehind, StreamCapabilities,
|
||||
IOStatisticsSource {
|
||||
IOStatisticsSource, Abortable {
|
||||
private final OutputStream wrappedStream;
|
||||
|
||||
private static class PositionCache extends FilterOutputStream {
|
||||
@ -168,4 +168,21 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
|
||||
public IOStatistics getIOStatistics() {
|
||||
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke {@code abort()} on the wrapped stream if it
|
||||
* is Abortable, otherwise raise an
|
||||
* {@code UnsupportedOperationException}.
|
||||
* @throws UnsupportedOperationException if not available.
|
||||
* @return the result.
|
||||
*/
|
||||
@Override
|
||||
public AbortableResult abort() {
|
||||
if (wrappedStream instanceof Abortable) {
|
||||
return ((Abortable) wrappedStream).abort();
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
FSExceptionMessages.ABORTABLE_UNSUPPORTED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -51,4 +51,10 @@ public class FSExceptionMessages {
|
||||
|
||||
public static final String PERMISSION_DENIED_BY_STICKY_BIT =
|
||||
"Permission denied by sticky bit";
|
||||
|
||||
/**
|
||||
* A call was made to abort(), but it is not supported.
|
||||
*/
|
||||
public static final String ABORTABLE_UNSUPPORTED =
|
||||
"Abortable.abort() is not supported";
|
||||
}
|
||||
|
@ -80,6 +80,13 @@ public interface StreamCapabilities {
|
||||
*/
|
||||
String IOSTATISTICS = "iostatistics";
|
||||
|
||||
/**
|
||||
* Stream abort() capability implemented by {@link Abortable#abort()}.
|
||||
* This matches the Path Capability
|
||||
* {@link CommonPathCapabilities#ABORTABLE_STREAM}.
|
||||
*/
|
||||
String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM;
|
||||
|
||||
/**
|
||||
* Capabilities that a stream can support and be queried for.
|
||||
*/
|
||||
|
@ -37,6 +37,9 @@
|
||||
@InterfaceStability.Evolving
|
||||
public final class StoreStatisticNames {
|
||||
|
||||
/** {@value}. */
|
||||
public static final String OP_ABORT = "op_abort";
|
||||
|
||||
/** {@value}. */
|
||||
public static final String OP_APPEND = "op_append";
|
||||
|
||||
|
@ -0,0 +1,186 @@
|
||||
<!---
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
|
||||
<!-- ============================================================= -->
|
||||
<!-- CLASS: FileSystem -->
|
||||
<!-- ============================================================= -->
|
||||
|
||||
# interface `org.apache.hadoop.fs.Abortable`
|
||||
|
||||
<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
|
||||
|
||||
Abort the active operation such that the output does not become
|
||||
manifest.
|
||||
|
||||
Specifically, if supported on an [output stream](outputstream.html),
|
||||
a successful `abort()` MUST guarantee that the stream will not be made visible in the `close()`
|
||||
operation.
|
||||
|
||||
```java
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface Abortable {
|
||||
|
||||
/**
|
||||
* Abort the active operation without the output becoming visible.
|
||||
*
|
||||
* This is to provide ability to cancel the write on stream; once
|
||||
* a stream is aborted, the write MUST NOT become visible.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the operation is not supported.
|
||||
* @return the result.
|
||||
*/
|
||||
AbortableResult abort();
|
||||
|
||||
/**
|
||||
* Interface for the result of aborts; allows subclasses to extend
|
||||
* (IOStatistics etc) or for future enhancements if ever needed.
|
||||
*/
|
||||
interface AbortableResult {
|
||||
|
||||
/**
|
||||
* Was the stream already closed/aborted?
|
||||
* @return true if a close/abort operation had already
|
||||
* taken place.
|
||||
*/
|
||||
boolean alreadyClosed();
|
||||
|
||||
/**
|
||||
* Any exception caught during cleanup operations,
|
||||
* exceptions whose raising/catching does not change
|
||||
* the semantics of the abort.
|
||||
* @return an exception or null.
|
||||
*/
|
||||
IOException anyCleanupException();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Method `abort()`
|
||||
|
||||
Aborts the ongoing operation such that no output SHALL become visible
|
||||
when the operation is completed.
|
||||
|
||||
Unless and until other File System classes implement `Abortable`, the
|
||||
interface is specified purely for output streams.
|
||||
|
||||
## Method `abort()` on an output stream
|
||||
|
||||
`Abortable.abort()` MUST only be supported on output streams
|
||||
whose output is only made visible when `close()` is called,
|
||||
for example. output streams returned by the S3A FileSystem.
|
||||
|
||||
## Preconditions
|
||||
|
||||
The stream MUST implement `Abortable` and `StreamCapabilities`.
|
||||
|
||||
```python
|
||||
if unsupported:
|
||||
throw UnsupportedException
|
||||
|
||||
if not isOpen(stream):
|
||||
no-op
|
||||
|
||||
StreamCapabilities.hasCapability("fs.capability.outputstream.abortable") == True
|
||||
|
||||
```
|
||||
|
||||
|
||||
## Postconditions
|
||||
|
||||
After `abort()` returns, the filesystem MUST be unchanged:
|
||||
|
||||
```
|
||||
FS' = FS
|
||||
```
|
||||
|
||||
A successful `abort()` operation MUST guarantee that
|
||||
when the stream` close()` is invoked no output shall be manifest.
|
||||
|
||||
* The stream MUST retry any remote calls needed to force the abort outcome.
|
||||
* If any file was present at the destination path, it MUST remain unchanged.
|
||||
|
||||
Strictly then:
|
||||
|
||||
> if `Abortable.abort()` does not raise `UnsupportedOperationException`
|
||||
> then returns, then it guarantees that the write SHALL NOT become visible
|
||||
> and that any existing data in the filesystem at the destination path SHALL
|
||||
> continue to be available.
|
||||
|
||||
|
||||
1. Calls to `write()` methods MUST fail.
|
||||
1. Calls to `flush()` MUST be no-ops (applications sometimes call this on closed streams)
|
||||
1. Subsequent calls to `abort()` MUST be no-ops.
|
||||
1. `close()` MUST NOT manifest the file, and MUST NOT raise an exception
|
||||
|
||||
That is, the postconditions of `close()` becomes:
|
||||
|
||||
```
|
||||
FS' = FS
|
||||
```
|
||||
|
||||
### Cleanup
|
||||
|
||||
* If temporary data is stored in the local filesystem or in the store's upload
|
||||
infrastructure then this MAY be cleaned up; best-effort is expected here.
|
||||
|
||||
* The stream SHOULD NOT retry cleanup operations; any failure there MUST be
|
||||
caught and added to `AbortResult`
|
||||
|
||||
#### Returned `AbortResult`
|
||||
|
||||
The `AbortResult` value returned is primarily for testing and logging.
|
||||
|
||||
`alreadyClosed()`: MUST return `true` if the write had already been aborted or closed;
|
||||
|
||||
`anyCleanupException();`: SHOULD return any IOException raised during any optional
|
||||
cleanup operations.
|
||||
|
||||
|
||||
### Thread safety and atomicity
|
||||
|
||||
Output streams themselves aren't formally required to be thread safe,
|
||||
but as applications do sometimes assume they are, this call MUST be thread safe.
|
||||
|
||||
## Path/Stream capability "fs.capability.outputstream.abortable"
|
||||
|
||||
|
||||
An application MUST be able to verify that a stream supports the `Abortable.abort()`
|
||||
operation without actually calling it. This is done through the `StreamCapabilities`
|
||||
interface.
|
||||
|
||||
1. If a stream instance supports `Abortable` then it MUST return `true`
|
||||
in the probe `hasCapability("fs.capability.outputstream.abortable")`
|
||||
|
||||
1. If a stream instance does not support `Abortable` then it MUST return `false`
|
||||
in the probe `hasCapability("fs.capability.outputstream.abortable")`
|
||||
|
||||
That is: if a stream declares its support for the feature, a call to `abort()`
|
||||
SHALL meet the defined semantics of the operation.
|
||||
|
||||
FileSystem/FileContext implementations SHOULD declare support similarly, to
|
||||
allow for applications to probe for the feature in the destination directory/path.
|
||||
|
||||
If a filesystem supports `Abortable` under a path `P` then it SHOULD return `true` to
|
||||
`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`
|
||||
This is to allow applications to verify that the store supports the feature.
|
||||
|
||||
If a filesystem does not support `Abortable` under a path `P` then it MUST
|
||||
return `false` to
|
||||
`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`
|
||||
|
||||
|
||||
|
@ -33,6 +33,7 @@ HDFS as these are commonly expected by Hadoop client applications.
|
||||
1. [Model](model.html)
|
||||
1. [FileSystem class](filesystem.html)
|
||||
1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
|
||||
1. [Abortable](abortable.html)
|
||||
1. [FSDataInputStream class](fsdatainputstream.html)
|
||||
1. [PathCapabilities interface](pathcapabilities.html)
|
||||
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
|
||||
|
@ -893,7 +893,7 @@ Object store streams MAY buffer the entire stream's output
|
||||
until the final `close()` operation triggers a single `PUT` of the data
|
||||
and materialization of the final output.
|
||||
|
||||
This significantly change's their behaviour compared to that of
|
||||
This significantly changes their behaviour compared to that of
|
||||
POSIX filesystems and that specified in this document.
|
||||
|
||||
#### Visibility of newly created objects
|
||||
@ -961,6 +961,10 @@ is present: the act of instantiating the object, while potentially exhibiting
|
||||
create inconsistency, is atomic. Applications may be able to use that fact
|
||||
to their advantage.
|
||||
|
||||
The [Abortable](abortable.html) interface exposes this ability to abort an output
|
||||
stream before its data is made visible, so can be used for checkpointing and similar
|
||||
operations.
|
||||
|
||||
## <a name="implementors"></a> Implementors notes.
|
||||
|
||||
### Always implement `Syncable` -even if just to throw `UnsupportedOperationException`
|
||||
|
@ -233,8 +233,8 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
|
||||
public static void verifyFileContents(FileSystem fs,
|
||||
Path path,
|
||||
byte[] original) throws IOException {
|
||||
assertIsFile(fs, path);
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertIsFile(path, stat);
|
||||
String statText = stat.toString();
|
||||
assertEquals("wrong length " + statText, original.length, stat.getLen());
|
||||
byte[] bytes = readDataset(fs, path, original.length);
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -38,7 +39,6 @@
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
|
||||
@ -49,11 +49,14 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Abortable;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
||||
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
@ -61,7 +64,9 @@
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
||||
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
|
||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||
|
||||
/**
|
||||
@ -79,7 +84,7 @@
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
class S3ABlockOutputStream extends OutputStream implements
|
||||
StreamCapabilities, IOStatisticsSource, Syncable {
|
||||
StreamCapabilities, IOStatisticsSource, Syncable, Abortable {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(S3ABlockOutputStream.class);
|
||||
@ -171,7 +176,9 @@ class S3ABlockOutputStream extends OutputStream implements
|
||||
this.key = key;
|
||||
this.blockFactory = blockFactory;
|
||||
this.blockSize = (int) blockSize;
|
||||
this.statistics = statistics;
|
||||
this.statistics = statistics != null
|
||||
? statistics
|
||||
: EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
|
||||
// test instantiations may not provide statistics;
|
||||
this.iostatistics = statistics != null
|
||||
? statistics.getIOStatistics()
|
||||
@ -421,19 +428,107 @@ public void close() throws IOException {
|
||||
// if this happened during a multipart upload, abort the
|
||||
// operation, so as to not leave (billable) data
|
||||
// pending on the bucket
|
||||
if (multiPartUpload != null) {
|
||||
multiPartUpload.abort();
|
||||
}
|
||||
maybeAbortMultipart();
|
||||
writeOperationHelper.writeFailed(ioe);
|
||||
throw ioe;
|
||||
} finally {
|
||||
cleanupWithLogger(LOG, block, blockFactory);
|
||||
cleanupOnClose();
|
||||
}
|
||||
// Note end of write. This does not change the state of the remote FS.
|
||||
writeOperationHelper.writeSuccessful(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Final operations in close/abort of stream.
|
||||
* Shuts down block factory, closes any active block,
|
||||
* and pushes out statistics.
|
||||
*/
|
||||
private synchronized void cleanupOnClose() {
|
||||
cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
|
||||
LOG.debug("Statistics: {}", statistics);
|
||||
cleanupWithLogger(LOG, statistics);
|
||||
clearActiveBlock();
|
||||
}
|
||||
// Note end of write. This does not change the state of the remote FS.
|
||||
writeOperationHelper.writeSuccessful(bytes);
|
||||
|
||||
/**
|
||||
* Best effort abort of the multipart upload; sets
|
||||
* the field to null afterwards.
|
||||
* @return any exception caught during the operation.
|
||||
*/
|
||||
private synchronized IOException maybeAbortMultipart() {
|
||||
if (multiPartUpload != null) {
|
||||
final IOException ioe = multiPartUpload.abort();
|
||||
multiPartUpload = null;
|
||||
return ioe;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort any active uploads, enter closed state.
|
||||
* @return the outcome
|
||||
*/
|
||||
@Override
|
||||
public AbortableResult abort() {
|
||||
if (closed.getAndSet(true)) {
|
||||
// already closed
|
||||
LOG.debug("Ignoring abort() as stream is already closed");
|
||||
return new AbortableResultImpl(true, null);
|
||||
}
|
||||
try (DurationTracker d =
|
||||
statistics.trackDuration(INVOCATION_ABORT.getSymbol())) {
|
||||
return new AbortableResultImpl(false, maybeAbortMultipart());
|
||||
} finally {
|
||||
cleanupOnClose();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abortable result.
|
||||
*/
|
||||
private static final class AbortableResultImpl implements AbortableResult {
|
||||
|
||||
/**
|
||||
* Had the stream already been closed/aborted?
|
||||
*/
|
||||
private final boolean alreadyClosed;
|
||||
|
||||
/**
|
||||
* Was any exception raised during non-essential
|
||||
* cleanup actions (i.e. MPU abort)?
|
||||
*/
|
||||
private final IOException anyCleanupException;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param alreadyClosed Had the stream already been closed/aborted?
|
||||
* @param anyCleanupException Was any exception raised during cleanup?
|
||||
*/
|
||||
private AbortableResultImpl(final boolean alreadyClosed,
|
||||
final IOException anyCleanupException) {
|
||||
this.alreadyClosed = alreadyClosed;
|
||||
this.anyCleanupException = anyCleanupException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean alreadyClosed() {
|
||||
return alreadyClosed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOException anyCleanupException() {
|
||||
return anyCleanupException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ",
|
||||
AbortableResultImpl.class.getSimpleName() + "[", "]")
|
||||
.add("alreadyClosed=" + alreadyClosed)
|
||||
.add("anyCleanupException=" + anyCleanupException)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -548,6 +643,10 @@ public boolean hasCapability(String capability) {
|
||||
case StreamCapabilities.IOSTATISTICS:
|
||||
return true;
|
||||
|
||||
// S3A supports abort.
|
||||
case StreamCapabilities.ABORTABLE_STREAM:
|
||||
return true;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
@ -756,35 +855,43 @@ private void complete(List<PartETag> partETags)
|
||||
maybeRethrowUploadFailure();
|
||||
AtomicInteger errorCount = new AtomicInteger(0);
|
||||
try {
|
||||
trackDurationOfInvocation(statistics,
|
||||
MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> {
|
||||
writeOperationHelper.completeMPUwithRetries(key,
|
||||
uploadId,
|
||||
partETags,
|
||||
bytesSubmitted,
|
||||
errorCount);
|
||||
});
|
||||
} finally {
|
||||
statistics.exceptionInMultipartComplete(errorCount.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a multi-part upload. Retries are attempted on failures.
|
||||
* Abort a multi-part upload. Retries are not attempted on failures.
|
||||
* IOExceptions are caught; this is expected to be run as a cleanup process.
|
||||
* @return any caught exception.
|
||||
*/
|
||||
public void abort() {
|
||||
private IOException abort() {
|
||||
LOG.debug("Aborting upload");
|
||||
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
|
||||
cancelAllActiveFutures();
|
||||
try {
|
||||
trackDurationOfInvocation(statistics,
|
||||
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
|
||||
cancelAllActiveFutures();
|
||||
writeOperationHelper.abortMultipartUpload(key, uploadId,
|
||||
(text, e, r, i) -> statistics.exceptionInMultipartAbort());
|
||||
false, null);
|
||||
});
|
||||
return null;
|
||||
} catch (IOException e) {
|
||||
// this point is only reached if the operation failed more than
|
||||
// the allowed retry count
|
||||
LOG.warn("Unable to abort multipart upload,"
|
||||
+ " you may need to purge uploaded parts", e);
|
||||
statistics.exceptionInMultipartAbort();
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4724,6 +4724,7 @@ public boolean hasPathCapability(final Path path, final String capability)
|
||||
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
|
||||
ETAG_CHECKSUM_ENABLED_DEFAULT);
|
||||
|
||||
case CommonPathCapabilities.ABORTABLE_STREAM:
|
||||
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
|
||||
return true;
|
||||
|
||||
|
@ -1350,7 +1350,11 @@ private OutputStreamStatistics(
|
||||
.withGauges(
|
||||
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
|
||||
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
|
||||
.withDurationTracking(ACTION_EXECUTOR_ACQUIRED)
|
||||
.withDurationTracking(
|
||||
ACTION_EXECUTOR_ACQUIRED,
|
||||
INVOCATION_ABORT.getSymbol(),
|
||||
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(),
|
||||
MULTIPART_UPLOAD_COMPLETED.getSymbol())
|
||||
.build();
|
||||
setIOStatistics(st);
|
||||
// these are extracted to avoid lookups on heavily used counters.
|
||||
|
@ -88,6 +88,11 @@ public enum Statistic {
|
||||
TYPE_COUNTER),
|
||||
IGNORED_ERRORS("ignored_errors", "Errors caught and ignored",
|
||||
TYPE_COUNTER),
|
||||
|
||||
INVOCATION_ABORT(
|
||||
StoreStatisticNames.OP_ABORT,
|
||||
"Calls of abort()",
|
||||
TYPE_DURATION),
|
||||
INVOCATION_COPY_FROM_LOCAL_FILE(
|
||||
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
|
||||
"Calls of copyFromLocalFile()",
|
||||
|
@ -336,14 +336,17 @@ public CompleteMultipartUploadResult completeMPUwithRetries(
|
||||
* Abort a multipart upload operation.
|
||||
* @param destKey destination key of the upload
|
||||
* @param uploadId multipart operation Id
|
||||
* @param shouldRetry should failures trigger a retry?
|
||||
* @param retrying callback invoked on every retry
|
||||
* @throws IOException failure to abort
|
||||
* @throws FileNotFoundException if the abort ID is unknown
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
public void abortMultipartUpload(String destKey, String uploadId,
|
||||
Retried retrying)
|
||||
boolean shouldRetry, Retried retrying)
|
||||
throws IOException {
|
||||
if (shouldRetry) {
|
||||
// retrying option
|
||||
invoker.retry("Aborting multipart upload ID " + uploadId,
|
||||
destKey,
|
||||
true,
|
||||
@ -351,6 +354,14 @@ public void abortMultipartUpload(String destKey, String uploadId,
|
||||
() -> owner.abortMultipartUpload(
|
||||
destKey,
|
||||
uploadId));
|
||||
} else {
|
||||
// single pass attempt.
|
||||
once("Aborting multipart upload ID " + uploadId,
|
||||
destKey,
|
||||
() -> owner.abortMultipartUpload(
|
||||
destKey,
|
||||
uploadId));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -401,7 +412,7 @@ public int abortMultipartUploadsUnderPath(String prefix)
|
||||
@Retries.RetryTranslated
|
||||
public void abortMultipartCommit(String destKey, String uploadId)
|
||||
throws IOException {
|
||||
abortMultipartUpload(destKey, uploadId, invoker.getRetryCallback());
|
||||
abortMultipartUpload(destKey, uploadId, true, invoker.getRetryCallback());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -154,13 +154,14 @@ CompleteMultipartUploadResult completeMPUwithRetries(
|
||||
* Abort a multipart upload operation.
|
||||
* @param destKey destination key of the upload
|
||||
* @param uploadId multipart operation Id
|
||||
* @param shouldRetry should failures trigger a retry?
|
||||
* @param retrying callback invoked on every retry
|
||||
* @throws IOException failure to abort
|
||||
* @throws FileNotFoundException if the abort ID is unknown
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
void abortMultipartUpload(String destKey, String uploadId,
|
||||
Invoker.Retried retrying)
|
||||
boolean shouldRetry, Invoker.Retried retrying)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -1622,7 +1622,7 @@ private void processUploads(PrintStream out) throws IOException {
|
||||
if (mode == Mode.ABORT) {
|
||||
getFilesystem().getWriteOperationHelper()
|
||||
.abortMultipartUpload(upload.getKey(), upload.getUploadId(),
|
||||
LOG_EVENT);
|
||||
true, LOG_EVENT);
|
||||
}
|
||||
}
|
||||
if (mode != Mode.EXPECT || verbose) {
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
||||
@ -32,7 +33,10 @@
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
|
||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
|
||||
|
||||
/**
|
||||
* Tests small file upload functionality for
|
||||
@ -155,4 +159,51 @@ public void testMarkReset() throws Throwable {
|
||||
markAndResetDatablock(createFactory(getFileSystem()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortAfterWrite() throws Throwable {
|
||||
describe("Verify abort after a write does not create a file");
|
||||
Path dest = path(getMethodName());
|
||||
FileSystem fs = getFileSystem();
|
||||
ContractTestUtils.assertHasPathCapabilities(fs, dest, ABORTABLE_STREAM);
|
||||
FSDataOutputStream stream = fs.create(dest, true);
|
||||
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
|
||||
try {
|
||||
ContractTestUtils.assertCapabilities(stream,
|
||||
new String[]{ABORTABLE_STREAM},
|
||||
null);
|
||||
stream.write(data);
|
||||
assertCompleteAbort(stream.abort());
|
||||
// second attempt is harmless
|
||||
assertNoopAbort(stream.abort());
|
||||
|
||||
// the path should not exist
|
||||
ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
|
||||
} finally {
|
||||
IOUtils.closeStream(stream);
|
||||
// check the path doesn't exist "after" closing stream
|
||||
ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
|
||||
}
|
||||
// and it can be called on the stream after being closed.
|
||||
assertNoopAbort(stream.abort());
|
||||
}
|
||||
|
||||
/**
|
||||
* A stream which was abort()ed after being close()d for a
|
||||
* successful write will return indicating nothing happened.
|
||||
*/
|
||||
@Test
|
||||
public void testAbortAfterCloseIsHarmless() throws Throwable {
|
||||
describe("Verify abort on a closed stream is harmless "
|
||||
+ "and that the result indicates that nothing happened");
|
||||
Path dest = path(getMethodName());
|
||||
FileSystem fs = getFileSystem();
|
||||
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
|
||||
try (FSDataOutputStream stream = fs.create(dest, true)) {
|
||||
stream.write(data);
|
||||
assertCompleteAbort(stream.abort());
|
||||
stream.close();
|
||||
assertNoopAbort(stream.abort());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ public static void clearAnyUploads(S3AFileSystem fs, Path path) {
|
||||
while (uploads.hasNext()) {
|
||||
MultipartUpload upload = uploads.next();
|
||||
fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(),
|
||||
upload.getUploadId(), LOG_EVENT);
|
||||
upload.getUploadId(), true, LOG_EVENT);
|
||||
LOG.debug("Cleaning up upload: {} {}", upload.getKey(),
|
||||
truncatedUploadId(upload.getUploadId()));
|
||||
}
|
||||
|
@ -82,4 +82,30 @@ public void testWriteOperationHelperPartLimits() throws Throwable {
|
||||
() -> woh.newUploadPartRequest(key,
|
||||
"uploadId", 50000, 1024, inputStream, null, 0L));
|
||||
}
|
||||
|
||||
static class StreamClosedException extends IOException {}
|
||||
|
||||
@Test
|
||||
public void testStreamClosedAfterAbort() throws Exception {
|
||||
stream.abort();
|
||||
|
||||
// This verification replaces testing various operations after calling
|
||||
// abort: after calling abort, stream is closed like calling close().
|
||||
intercept(IOException.class, () -> stream.checkOpen());
|
||||
|
||||
// check that calling write() will call checkOpen() and throws exception
|
||||
doThrow(new StreamClosedException()).when(stream).checkOpen();
|
||||
|
||||
intercept(StreamClosedException.class,
|
||||
() -> stream.write(new byte[] {'a', 'b', 'c'}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallingCloseAfterCallingAbort() throws Exception {
|
||||
stream.abort();
|
||||
|
||||
// This shouldn't throw IOException like calling close() multiple times.
|
||||
// This will ensure abort() can be called with try-with-resource.
|
||||
stream.close();
|
||||
}
|
||||
}
|
||||
|
@ -25,19 +25,33 @@
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||
import org.apache.hadoop.fs.s3a.Statistic;
|
||||
import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
|
||||
|
||||
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_ABORT;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
|
||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
@ -118,4 +132,83 @@ public void testCommitLimitFailure() throws Throwable {
|
||||
describedAs("commit abort count")
|
||||
.isEqualTo(initial + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortAfterTwoPartUpload() throws Throwable {
|
||||
Path file = path(getMethodName());
|
||||
|
||||
byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
|
||||
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
FSDataOutputStream stream = fs.create(file, true);
|
||||
try {
|
||||
stream.write(data);
|
||||
|
||||
// From testTwoPartUpload() we know closing stream will finalize uploads
|
||||
// and materialize the path. Here we call abort() to abort the upload,
|
||||
// and ensure the path is NOT available. (uploads are aborted)
|
||||
|
||||
assertCompleteAbort(stream.abort());
|
||||
|
||||
// the path should not exist
|
||||
assertPathDoesNotExist("upload must not have completed", file);
|
||||
} finally {
|
||||
IOUtils.closeStream(stream);
|
||||
// check the path doesn't exist "after" closing stream
|
||||
assertPathDoesNotExist("upload must not have completed", file);
|
||||
}
|
||||
verifyStreamWasAborted(fs, stream);
|
||||
// a second abort is a no-op
|
||||
assertNoopAbort(stream.abort());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAbortWhenOverwritingAFile() throws Throwable {
|
||||
Path file = path(getMethodName());
|
||||
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
// write the original data
|
||||
byte[] smallData = writeTextFile(fs, file, "original", true);
|
||||
|
||||
// now attempt a multipart upload
|
||||
byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
|
||||
FSDataOutputStream stream = fs.create(file, true);
|
||||
try {
|
||||
ContractTestUtils.assertCapabilities(stream,
|
||||
new String[]{ABORTABLE_STREAM},
|
||||
null);
|
||||
stream.write(data);
|
||||
assertCompleteAbort(stream.abort());
|
||||
|
||||
verifyFileContents(fs, file, smallData);
|
||||
} finally {
|
||||
IOUtils.closeStream(stream);
|
||||
}
|
||||
verifyFileContents(fs, file, smallData);
|
||||
verifyStreamWasAborted(fs, stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check up on the IOStatistics of the FS and stream to verify that
|
||||
* a stream was aborted -both in invocations of abort() and
|
||||
* that the multipart upload itself was aborted.
|
||||
* @param fs filesystem
|
||||
* @param stream stream
|
||||
*/
|
||||
private void verifyStreamWasAborted(final S3AFileSystem fs,
|
||||
final FSDataOutputStream stream) {
|
||||
// check the stream
|
||||
final IOStatistics iostats = stream.getIOStatistics();
|
||||
final String sstr = ioStatisticsToPrettyString(iostats);
|
||||
LOG.info("IOStatistics for stream: {}", sstr);
|
||||
verifyStatisticCounterValue(iostats, INVOCATION_ABORT.getSymbol(), 1);
|
||||
verifyStatisticCounterValue(iostats,
|
||||
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), 1);
|
||||
|
||||
// now the FS.
|
||||
final IOStatistics fsIostats = fs.getIOStatistics();
|
||||
assertThatStatisticCounter(fsIostats, INVOCATION_ABORT.getSymbol())
|
||||
.isGreaterThanOrEqualTo(1);
|
||||
}
|
||||
}
|
||||
|
@ -23,11 +23,13 @@
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assert;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Abortable;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
@ -149,4 +151,31 @@ protected void assertStatusCode(AWSServiceIOException e, int code)
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Assert that an abort was completely successful in that it
|
||||
* was not a no-op and no exception was raised during
|
||||
* cleanup.
|
||||
* @param result result to assert over
|
||||
*/
|
||||
public static void assertCompleteAbort(
|
||||
Abortable.AbortableResult result) {
|
||||
Assertions.assertThat(result)
|
||||
.describedAs("Abort operation result %s", result)
|
||||
.matches(r -> !r.alreadyClosed())
|
||||
.matches(r -> r.anyCleanupException() == null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that an abort was a no-op as the
|
||||
* stream had already closed/aborted.
|
||||
* @param result result to assert over
|
||||
*/
|
||||
public static void assertNoopAbort(
|
||||
Abortable.AbortableResult result) {
|
||||
Assertions.assertThat(result)
|
||||
.describedAs("Abort operation result %s", result)
|
||||
.matches(r -> r.alreadyClosed());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user