HADOOP-17597. Optionally downgrade on S3A Syncable calls (#2801)
Followup to HADOOP-13327, which changed S3A output stream hsync/hflush calls to raise an exception. Adds a new option fs.s3a.downgrade.syncable.exceptions When true, calls to Syncable hsync/hflush on S3A output streams will log once at warn (for entire process life, not just the stream), then increment IOStats with the relevant operation counter With the downgrade option false (default) * IOStats are incremented * The UnsupportedOperationException current raised includes a link to the JIRA. Contributed by Steve Loughran.
This commit is contained in:
parent
6800b21e3b
commit
027c8fb257
@ -84,6 +84,12 @@ public final class StoreStatisticNames {
|
||||
/** {@value}. */
|
||||
public static final String OP_IS_FILE = "op_is_file";
|
||||
|
||||
/** {@value}. */
|
||||
public static final String OP_HFLUSH = "op_hflush";
|
||||
|
||||
/** {@value}. */
|
||||
public static final String OP_HSYNC = "op_hsync";
|
||||
|
||||
/** {@value}. */
|
||||
public static final String OP_IS_DIRECTORY = "op_is_directory";
|
||||
|
||||
|
@ -329,7 +329,6 @@ private Constants() {
|
||||
* Default is {@link #FAST_UPLOAD_BUFFER_DISK}
|
||||
* Value: {@value}
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String FAST_UPLOAD_BUFFER =
|
||||
"fs.s3a.fast.upload.buffer";
|
||||
|
||||
@ -338,26 +337,22 @@ private Constants() {
|
||||
* Capacity is limited to available disk space.
|
||||
*/
|
||||
|
||||
@InterfaceStability.Unstable
|
||||
public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
|
||||
|
||||
/**
|
||||
* Use an in-memory array. Fast but will run of heap rapidly: {@value}.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
|
||||
|
||||
/**
|
||||
* Use a byte buffer. May be more memory efficient than the
|
||||
* {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
|
||||
|
||||
/**
|
||||
* Default buffer option: {@value}.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String DEFAULT_FAST_UPLOAD_BUFFER =
|
||||
FAST_UPLOAD_BUFFER_DISK;
|
||||
|
||||
@ -370,7 +365,6 @@ private Constants() {
|
||||
* <p>
|
||||
* Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS}
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String FAST_UPLOAD_ACTIVE_BLOCKS =
|
||||
"fs.s3a.fast.upload.active.blocks";
|
||||
|
||||
@ -378,9 +372,23 @@ private Constants() {
|
||||
* Limit of queued block upload operations before writes
|
||||
* block. Value: {@value}
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;
|
||||
|
||||
/**
|
||||
* Rather than raise an exception when an attempt is
|
||||
* made to call the Syncable APIs, warn and downgrade.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String DOWNGRADE_SYNCABLE_EXCEPTIONS =
|
||||
"fs.s3a.downgrade.syncable.exceptions";
|
||||
|
||||
/**
|
||||
* Default value for syncable invocation.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final boolean DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT =
|
||||
false;
|
||||
|
||||
/**
|
||||
* The capacity of executor queues for operations other than block
|
||||
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
|
||||
|
@ -55,6 +55,7 @@
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
||||
import org.apache.hadoop.fs.s3a.impl.LogExactlyOnce;
|
||||
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
@ -62,10 +63,10 @@
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
||||
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
|
||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||
|
||||
@ -89,10 +90,8 @@ class S3ABlockOutputStream extends OutputStream implements
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(S3ABlockOutputStream.class);
|
||||
|
||||
private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable";
|
||||
|
||||
/** Owner FileSystem. */
|
||||
private final S3AFileSystem fs;
|
||||
private static final String E_NOT_SYNCABLE =
|
||||
"S3A streams are not Syncable. See HADOOP-17597.";
|
||||
|
||||
/** Object being uploaded. */
|
||||
private final String key;
|
||||
@ -136,62 +135,48 @@ class S3ABlockOutputStream extends OutputStream implements
|
||||
/**
|
||||
* Write operation helper; encapsulation of the filesystem operations.
|
||||
*/
|
||||
private final WriteOperationHelper writeOperationHelper;
|
||||
private final WriteOperations writeOperationHelper;
|
||||
|
||||
/**
|
||||
* Track multipart put operation.
|
||||
*/
|
||||
private final PutTracker putTracker;
|
||||
|
||||
/** Should Syncable calls be downgraded? */
|
||||
private final boolean downgradeSyncableExceptions;
|
||||
|
||||
/**
|
||||
* Downagraded syncable API calls are only logged at warn
|
||||
* once across the entire process.
|
||||
*/
|
||||
private static final LogExactlyOnce WARN_ON_SYNCABLE =
|
||||
new LogExactlyOnce(LOG);
|
||||
|
||||
/**
|
||||
* An S3A output stream which uploads partitions in a separate pool of
|
||||
* threads; different {@link S3ADataBlocks.BlockFactory}
|
||||
* instances can control where data is buffered.
|
||||
*
|
||||
* @param fs S3AFilesystem
|
||||
* @param key S3 object to work on.
|
||||
* @param executorService the executor service to use to schedule work
|
||||
* @param progress report progress in order to prevent timeouts. If
|
||||
* this object implements {@code ProgressListener} then it will be
|
||||
* directly wired up to the AWS client, so receive detailed progress
|
||||
* information.
|
||||
* @param blockSize size of a single block.
|
||||
* @param blockFactory factory for creating stream destinations
|
||||
* @param statistics stats for this stream
|
||||
* @param writeOperationHelper state of the write operation.
|
||||
* @param putTracker put tracking for commit support
|
||||
* @throws IOException on any problem
|
||||
*/
|
||||
S3ABlockOutputStream(S3AFileSystem fs,
|
||||
String key,
|
||||
ExecutorService executorService,
|
||||
Progressable progress,
|
||||
long blockSize,
|
||||
S3ADataBlocks.BlockFactory blockFactory,
|
||||
BlockOutputStreamStatistics statistics,
|
||||
WriteOperationHelper writeOperationHelper,
|
||||
PutTracker putTracker)
|
||||
S3ABlockOutputStream(BlockOutputStreamBuilder builder)
|
||||
throws IOException {
|
||||
this.fs = fs;
|
||||
this.key = key;
|
||||
this.blockFactory = blockFactory;
|
||||
this.blockSize = (int) blockSize;
|
||||
this.statistics = statistics != null
|
||||
? statistics
|
||||
: EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
|
||||
builder.validate();
|
||||
this.key = builder.key;
|
||||
this.blockFactory = builder.blockFactory;
|
||||
this.blockSize = (int) builder.blockSize;
|
||||
this.statistics = builder.statistics;
|
||||
// test instantiations may not provide statistics;
|
||||
this.iostatistics = statistics != null
|
||||
? statistics.getIOStatistics()
|
||||
: emptyStatistics();
|
||||
this.writeOperationHelper = writeOperationHelper;
|
||||
this.putTracker = putTracker;
|
||||
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
|
||||
"Block size is too small: %d", blockSize);
|
||||
this.executorService = MoreExecutors.listeningDecorator(executorService);
|
||||
this.iostatistics = statistics.getIOStatistics();
|
||||
this.writeOperationHelper = builder.writeOperations;
|
||||
this.putTracker = builder.putTracker;
|
||||
this.executorService = MoreExecutors.listeningDecorator(
|
||||
builder.executorService);
|
||||
this.multiPartUpload = null;
|
||||
final Progressable progress = builder.progress;
|
||||
this.progressListener = (progress instanceof ProgressListener) ?
|
||||
(ProgressListener) progress
|
||||
: new ProgressableListener(progress);
|
||||
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
|
||||
// create that first block. This guarantees that an open + close sequence
|
||||
// writes a 0-byte entry.
|
||||
createBlockIfNeeded();
|
||||
@ -597,7 +582,7 @@ public String toString() {
|
||||
}
|
||||
|
||||
private void incrementWriteOperations() {
|
||||
fs.incrementWriteOperations();
|
||||
writeOperationHelper.incrementWriteOperations();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -654,12 +639,31 @@ public boolean hasCapability(String capability) {
|
||||
|
||||
@Override
|
||||
public void hflush() throws IOException {
|
||||
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
|
||||
statistics.hflushInvoked();
|
||||
handleSyncableInvocation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hsync() throws IOException {
|
||||
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
|
||||
statistics.hsyncInvoked();
|
||||
handleSyncableInvocation();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shared processing of Syncable operation reporting/downgrade.
|
||||
*/
|
||||
private void handleSyncableInvocation() {
|
||||
final UnsupportedOperationException ex
|
||||
= new UnsupportedOperationException(E_NOT_SYNCABLE);
|
||||
if (!downgradeSyncableExceptions) {
|
||||
throw ex;
|
||||
}
|
||||
// downgrading.
|
||||
WARN_ON_SYNCABLE.warn("Application invoked the Syncable API against"
|
||||
+ " stream writing to {}. This is unsupported",
|
||||
key);
|
||||
// and log at debug
|
||||
LOG.debug("Downgrading Syncable call", ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -982,4 +986,166 @@ public void progressChanged(ProgressEvent progressEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a builder.
|
||||
* @return
|
||||
*/
|
||||
public static BlockOutputStreamBuilder builder() {
|
||||
return new BlockOutputStreamBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class for constructing an output stream.
|
||||
*/
|
||||
public static final class BlockOutputStreamBuilder {
|
||||
|
||||
/** S3 object to work on. */
|
||||
private String key;
|
||||
|
||||
/** The executor service to use to schedule work. */
|
||||
private ExecutorService executorService;
|
||||
|
||||
/**
|
||||
* Report progress in order to prevent timeouts.
|
||||
* this object implements {@code ProgressListener} then it will be
|
||||
* directly wired up to the AWS client, so receive detailed progress
|
||||
* information.
|
||||
*/
|
||||
private Progressable progress;
|
||||
|
||||
/** The size of a single block. */
|
||||
private long blockSize;
|
||||
|
||||
/** The factory for creating stream destinations. */
|
||||
private S3ADataBlocks.BlockFactory blockFactory;
|
||||
|
||||
/** The output statistics for the stream. */
|
||||
private BlockOutputStreamStatistics statistics =
|
||||
EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
|
||||
|
||||
/** Operations to write data. */
|
||||
private WriteOperations writeOperations;
|
||||
|
||||
/** put tracking for commit support. */
|
||||
private PutTracker putTracker;
|
||||
|
||||
/** Should Syncable calls be downgraded? */
|
||||
private boolean downgradeSyncableExceptions;
|
||||
|
||||
private BlockOutputStreamBuilder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the arguments.
|
||||
*/
|
||||
public void validate() {
|
||||
requireNonNull(key, "null key");
|
||||
requireNonNull(executorService, "null executorService");
|
||||
requireNonNull(blockFactory, "null blockFactory");
|
||||
requireNonNull(statistics, "null statistics");
|
||||
requireNonNull(writeOperations, "null writeOperationHelper");
|
||||
requireNonNull(putTracker, "null putTracker");
|
||||
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
|
||||
"Block size is too small: %s", blockSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withKey(
|
||||
final String value) {
|
||||
key = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withExecutorService(
|
||||
final ExecutorService value) {
|
||||
executorService = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withProgress(
|
||||
final Progressable value) {
|
||||
progress = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withBlockSize(
|
||||
final long value) {
|
||||
blockSize = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withBlockFactory(
|
||||
final S3ADataBlocks.BlockFactory value) {
|
||||
blockFactory = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withStatistics(
|
||||
final BlockOutputStreamStatistics value) {
|
||||
statistics = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withWriteOperations(
|
||||
final WriteOperationHelper value) {
|
||||
writeOperations = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withPutTracker(
|
||||
final PutTracker value) {
|
||||
putTracker = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
|
||||
final boolean value) {
|
||||
downgradeSyncableExceptions = value;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1348,20 +1348,27 @@ public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
String destKey = putTracker.getDestKey();
|
||||
final BlockOutputStreamStatistics outputStreamStatistics
|
||||
= statisticsContext.newOutputStreamStatistics();
|
||||
return new FSDataOutputStream(
|
||||
new S3ABlockOutputStream(this,
|
||||
destKey,
|
||||
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
|
||||
S3ABlockOutputStream.builder()
|
||||
.withKey(destKey)
|
||||
.withBlockFactory(blockFactory)
|
||||
.withBlockSize(partSize)
|
||||
.withStatistics(outputStreamStatistics)
|
||||
.withProgress(progress)
|
||||
.withPutTracker(putTracker)
|
||||
.withWriteOperations(getWriteOperationHelper())
|
||||
.withExecutorService(
|
||||
new SemaphoredDelegatingExecutor(
|
||||
boundedThreadPool,
|
||||
blockOutputActiveBlocks,
|
||||
true,
|
||||
outputStreamStatistics),
|
||||
progress,
|
||||
partSize,
|
||||
blockFactory,
|
||||
outputStreamStatistics,
|
||||
getWriteOperationHelper(),
|
||||
putTracker),
|
||||
outputStreamStatistics))
|
||||
.withDowngradeSyncableExceptions(
|
||||
getConf().getBoolean(
|
||||
DOWNGRADE_SYNCABLE_EXCEPTIONS,
|
||||
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT));
|
||||
return new FSDataOutputStream(
|
||||
new S3ABlockOutputStream(builder),
|
||||
null);
|
||||
}
|
||||
|
||||
|
@ -1346,7 +1346,9 @@ private OutputStreamStatistics(
|
||||
STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
|
||||
STREAM_WRITE_QUEUE_DURATION.getSymbol(),
|
||||
STREAM_WRITE_TOTAL_DATA.getSymbol(),
|
||||
STREAM_WRITE_TOTAL_TIME.getSymbol())
|
||||
STREAM_WRITE_TOTAL_TIME.getSymbol(),
|
||||
INVOCATION_HFLUSH.getSymbol(),
|
||||
INVOCATION_HSYNC.getSymbol())
|
||||
.withGauges(
|
||||
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
|
||||
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
|
||||
@ -1489,6 +1491,16 @@ public void commitUploaded(long size) {
|
||||
incrementCounter(COMMITTER_BYTES_UPLOADED, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hflushInvoked() {
|
||||
incCounter(INVOCATION_HFLUSH.getSymbol(), 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hsyncInvoked() {
|
||||
incCounter(INVOCATION_HSYNC.getSymbol(), 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (getBytesPendingUpload() > 0) {
|
||||
|
@ -137,6 +137,14 @@ public enum Statistic {
|
||||
StoreStatisticNames.OP_IS_FILE,
|
||||
"Calls of isFile()",
|
||||
TYPE_COUNTER),
|
||||
INVOCATION_HFLUSH(
|
||||
StoreStatisticNames.OP_HFLUSH,
|
||||
"Calls of hflush()",
|
||||
TYPE_COUNTER),
|
||||
INVOCATION_HSYNC(
|
||||
StoreStatisticNames.OP_HSYNC,
|
||||
"Calls of hsync()",
|
||||
TYPE_COUNTER),
|
||||
INVOCATION_LIST_FILES(
|
||||
StoreStatisticNames.OP_LIST_FILES,
|
||||
"Calls of listFiles()",
|
||||
|
@ -693,4 +693,9 @@ public SelectObjectContentResult select(
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWriteOperations() {
|
||||
owner.incrementWriteOperations();
|
||||
}
|
||||
}
|
||||
|
@ -338,4 +338,10 @@ SelectObjectContentResult select(
|
||||
SelectObjectContentRequest request,
|
||||
String action)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Increment the write operation counter
|
||||
* of the filesystem.
|
||||
*/
|
||||
void incrementWriteOperations();
|
||||
}
|
||||
|
@ -134,4 +134,14 @@ public interface BlockOutputStreamStatistics extends Closeable,
|
||||
* @return the value or null if no matching gauge was found.
|
||||
*/
|
||||
Long lookupGaugeValue(String name);
|
||||
|
||||
/**
|
||||
* Syncable.hflush() has been invoked.
|
||||
*/
|
||||
void hflushInvoked();
|
||||
|
||||
/**
|
||||
* Syncable.hsync() has been invoked.
|
||||
*/
|
||||
void hsyncInvoked();
|
||||
}
|
||||
|
@ -482,6 +482,14 @@ public Long lookupGaugeValue(final String name) {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hflushInvoked() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hsyncInvoked() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ Common problems working with S3 are
|
||||
|
||||
1. Classpath setup
|
||||
1. Authentication
|
||||
1. S3 Inconsistency side-effects
|
||||
1. Incorrect configuration
|
||||
|
||||
|
||||
Troubleshooting IAM Assumed Roles is covered in its
|
||||
@ -1027,7 +1027,7 @@ at the end of a write operation. If a process terminated unexpectedly, or failed
|
||||
to call the `close()` method on an output stream, the pending data will have
|
||||
been lost.
|
||||
|
||||
### File `flush()`, `hsync` and `hflush()` calls do not save data to S3
|
||||
### File `flush()` calls do not save data to S3
|
||||
|
||||
Again, this is due to the fact that the data is cached locally until the
|
||||
`close()` operation. The S3A filesystem cannot be used as a store of data
|
||||
@ -1036,6 +1036,39 @@ if it is required that the data is persisted durably after every
|
||||
This includes resilient logging, HBase-style journaling
|
||||
and the like. The standard strategy here is to save to HDFS and then copy to S3.
|
||||
|
||||
### <a name="syncable"></a> `UnsupportedOperationException` "S3A streams are not Syncable. See HADOOP-17597."
|
||||
|
||||
The application has tried to call either the `Syncable.hsync()` or `Syncable.hflush()`
|
||||
methods on an S3A output stream. This has been rejected because the
|
||||
connector isn't saving any data at all. The `Syncable` API, especially the
|
||||
`hsync()` call, are critical for applications such as HBase to safely
|
||||
persist data.
|
||||
|
||||
The S3A connector throws an `UnsupportedOperationException` when these API calls
|
||||
are made, because the guarantees absolutely cannot be met: nothing is being flushed
|
||||
or saved.
|
||||
|
||||
* Applications which intend to invoke the Syncable APIs call `hasCapability("hsync")` on
|
||||
the stream to see if they are supported.
|
||||
* Or catch and downgrade `UnsupportedOperationException`.
|
||||
|
||||
These recommendations _apply to all filesystems_.
|
||||
|
||||
To downgrade the S3A connector to simply warning of the use of
|
||||
`hsync()` or `hflush()` calls, set the option
|
||||
`fs.s3a.downgrade.syncable.exceptions` to true.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.downgrade.syncable.exceptions</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
The count of invocations of the two APIs are collected
|
||||
in the S3A filesystem Statistics/IOStatistics and so
|
||||
their use can be monitored.
|
||||
|
||||
### `RemoteFileChangedException` and read-during-overwrite
|
||||
|
||||
```
|
||||
|
@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
|
||||
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HFLUSH;
|
||||
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HSYNC;
|
||||
|
||||
|
||||
public class ITestDowngradeSyncable extends AbstractS3ACostTest {
|
||||
|
||||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestDowngradeSyncable.class);
|
||||
|
||||
|
||||
public ITestDowngradeSyncable() {
|
||||
super(false, true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration createConfiguration() {
|
||||
final Configuration conf = super.createConfiguration();
|
||||
String bucketName = getTestBucketName(conf);
|
||||
removeBucketOverrides(bucketName, conf,
|
||||
DOWNGRADE_SYNCABLE_EXCEPTIONS);
|
||||
conf.setBoolean(DOWNGRADE_SYNCABLE_EXCEPTIONS, true);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHFlushDowngrade() throws Throwable {
|
||||
describe("Verify that hflush() calls can be downgraded from fail"
|
||||
+ " to ignore; the relevant counter is updated");
|
||||
Path path = methodPath();
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
final IOStatistics fsIoStats = fs.getIOStatistics();
|
||||
assertThatStatisticCounter(fsIoStats, OP_HFLUSH)
|
||||
.isEqualTo(0);
|
||||
|
||||
try (FSDataOutputStream out = fs.create(path, true)) {
|
||||
out.write('1');
|
||||
// must succeed
|
||||
out.hflush();
|
||||
// stats counter records the downgrade
|
||||
IOStatistics iostats = out.getIOStatistics();
|
||||
LOG.info("IOStats {}", ioStatisticsToString(iostats));
|
||||
assertThatStatisticCounter(iostats, OP_HFLUSH)
|
||||
.isEqualTo(1);
|
||||
assertThatStatisticCounter(iostats, OP_HSYNC)
|
||||
.isEqualTo(0);
|
||||
}
|
||||
// once closed. the FS will have its stats merged.
|
||||
assertThatStatisticCounter(fsIoStats, OP_HFLUSH)
|
||||
.isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHSyncDowngrade() throws Throwable {
|
||||
describe("Verify that hsync() calls can be downgraded from fail"
|
||||
+ " to ignore; the relevant counter is updated");
|
||||
Path path = methodPath();
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
final IOStatistics fsIoStats = fs.getIOStatistics();
|
||||
assertThatStatisticCounter(fsIoStats, OP_HSYNC)
|
||||
.isEqualTo(0);
|
||||
|
||||
try (FSDataOutputStream out = fs.create(path, true)) {
|
||||
out.write('1');
|
||||
// must succeed
|
||||
out.hsync();
|
||||
// stats counter records the downgrade
|
||||
IOStatistics iostats = out.getIOStatistics();
|
||||
LOG.info("IOStats {}", ioStatisticsToString(iostats));
|
||||
assertThatStatisticCounter(iostats, OP_HFLUSH)
|
||||
.isEqualTo(0);
|
||||
assertThatStatisticCounter(iostats, OP_HSYNC)
|
||||
.isEqualTo(1);
|
||||
}
|
||||
// once closed. the FS will have its stats merged.
|
||||
assertThatStatisticCounter(fsIoStats, OP_HSYNC)
|
||||
.isEqualTo(1);
|
||||
}
|
||||
|
||||
}
|
@ -43,8 +43,11 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
|
||||
|
||||
private S3ABlockOutputStream stream;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
/**
|
||||
* Create an S3A Builder all mocked up from component pieces.
|
||||
* @return stream builder.
|
||||
*/
|
||||
private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() {
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
Progressable progressable = mock(Progressable.class);
|
||||
S3ADataBlocks.BlockFactory blockFactory =
|
||||
@ -52,11 +55,26 @@ public void setUp() throws Exception {
|
||||
long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
|
||||
WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
|
||||
PutTracker putTracker = mock(PutTracker.class);
|
||||
stream = spy(new S3ABlockOutputStream(fs, "", executorService,
|
||||
progressable, blockSize, blockFactory, null, oHelper,
|
||||
putTracker));
|
||||
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
|
||||
S3ABlockOutputStream.builder()
|
||||
.withBlockFactory(blockFactory)
|
||||
.withBlockSize(blockSize)
|
||||
.withExecutorService(executorService)
|
||||
.withKey("")
|
||||
.withProgress(progressable)
|
||||
.withPutTracker(putTracker)
|
||||
.withWriteOperations(oHelper);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
final S3ABlockOutputStream.BlockOutputStreamBuilder
|
||||
builder = mockS3ABuilder();
|
||||
stream = spy(new S3ABlockOutputStream(builder));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testFlushNoOpWhenStreamClosed() throws Exception {
|
||||
doThrow(new IOException()).when(stream).checkOpen();
|
||||
@ -108,4 +126,31 @@ public void testCallingCloseAfterCallingAbort() throws Exception {
|
||||
// This will ensure abort() can be called with try-with-resource.
|
||||
stream.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Unless configured to downgrade, the stream will raise exceptions on
|
||||
* Syncable API calls.
|
||||
*/
|
||||
@Test
|
||||
public void testSyncableUnsupported() throws Exception {
|
||||
intercept(UnsupportedOperationException.class, () -> stream.hflush());
|
||||
intercept(UnsupportedOperationException.class, () -> stream.hsync());
|
||||
}
|
||||
|
||||
/**
|
||||
* When configured to downgrade, the stream downgrades on
|
||||
* Syncable API calls.
|
||||
*/
|
||||
@Test
|
||||
public void testSyncableDowngrade() throws Exception {
|
||||
final S3ABlockOutputStream.BlockOutputStreamBuilder
|
||||
builder = mockS3ABuilder();
|
||||
builder.withDowngradeSyncableExceptions(true);
|
||||
stream = spy(new S3ABlockOutputStream(builder));
|
||||
|
||||
stream.hflush();
|
||||
stream.hsync();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user