HADOOP-16548 : Disable Flush() over config
This commit is contained in:
parent
d8313b2274
commit
c0edc848a8
@ -146,6 +146,10 @@ public class AbfsConfiguration{
|
|||||||
DefaultValue = DEFAULT_ENABLE_FLUSH)
|
DefaultValue = DEFAULT_ENABLE_FLUSH)
|
||||||
private boolean enableFlush;
|
private boolean enableFlush;
|
||||||
|
|
||||||
|
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH,
|
||||||
|
DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH)
|
||||||
|
private boolean disableOutputStreamFlush;
|
||||||
|
|
||||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
|
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
|
||||||
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
|
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
|
||||||
private boolean enableAutoThrottling;
|
private boolean enableAutoThrottling;
|
||||||
@ -427,6 +431,10 @@ public boolean isFlushEnabled() {
|
|||||||
return this.enableFlush;
|
return this.enableFlush;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isOutputStreamFlushDisabled() {
|
||||||
|
return this.disableOutputStreamFlush;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isAutoThrottlingEnabled() {
|
public boolean isAutoThrottlingEnabled() {
|
||||||
return this.enableAutoThrottling;
|
return this.enableAutoThrottling;
|
||||||
}
|
}
|
||||||
@ -635,4 +643,10 @@ void setWriteBufferSize(int bufferSize) {
|
|||||||
void setEnableFlush(boolean enableFlush) {
|
void setEnableFlush(boolean enableFlush) {
|
||||||
this.enableFlush = enableFlush;
|
this.enableFlush = enableFlush;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setDisableOutputStreamFlush(boolean disableOutputStreamFlush) {
|
||||||
|
this.disableOutputStreamFlush = disableOutputStreamFlush;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -362,7 +362,8 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
|
|||||||
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
|
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
|
||||||
0,
|
0,
|
||||||
abfsConfiguration.getWriteBufferSize(),
|
abfsConfiguration.getWriteBufferSize(),
|
||||||
abfsConfiguration.isFlushEnabled());
|
abfsConfiguration.isFlushEnabled(),
|
||||||
|
abfsConfiguration.isOutputStreamFlushDisabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
|
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
|
||||||
@ -434,7 +435,8 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
|
|||||||
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
|
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
|
||||||
offset,
|
offset,
|
||||||
abfsConfiguration.getWriteBufferSize(),
|
abfsConfiguration.getWriteBufferSize(),
|
||||||
abfsConfiguration.isFlushEnabled());
|
abfsConfiguration.isFlushEnabled(),
|
||||||
|
abfsConfiguration.isOutputStreamFlushDisabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rename(final Path source, final Path destination) throws
|
public void rename(final Path source, final Path destination) throws
|
||||||
|
@ -51,7 +51,15 @@ public final class ConfigurationKeys {
|
|||||||
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
|
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
|
||||||
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
|
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
|
||||||
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
|
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
|
||||||
|
/** Provides a config control to enable or disable ABFS Flush operations -
|
||||||
|
* HFlush and HSync. Default is true. **/
|
||||||
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
|
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
|
||||||
|
/** Provides a config control to disable or enable OutputStream Flush API
|
||||||
|
* operations in AbfsOutputStream. Flush() will trigger actions that
|
||||||
|
* guarantee that buffered data is persistent with a perf cost while the API
|
||||||
|
* documentation does not have such expectations of data being persisted.
|
||||||
|
* Default value of this config is true. **/
|
||||||
|
public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
|
||||||
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
|
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
|
||||||
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
|
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
|
||||||
public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
|
public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
|
||||||
|
@ -57,6 +57,7 @@ public final class FileSystemConfigurations {
|
|||||||
|
|
||||||
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
||||||
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
||||||
|
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
|
||||||
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
|
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
|
||||||
|
|
||||||
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
|
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
|
||||||
|
@ -52,6 +52,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|||||||
private long position;
|
private long position;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private boolean supportFlush;
|
private boolean supportFlush;
|
||||||
|
private boolean disableOutputStreamFlush;
|
||||||
private volatile IOException lastError;
|
private volatile IOException lastError;
|
||||||
|
|
||||||
private long lastFlushOffset;
|
private long lastFlushOffset;
|
||||||
@ -80,12 +81,14 @@ public AbfsOutputStream(
|
|||||||
final String path,
|
final String path,
|
||||||
final long position,
|
final long position,
|
||||||
final int bufferSize,
|
final int bufferSize,
|
||||||
final boolean supportFlush) {
|
final boolean supportFlush,
|
||||||
|
final boolean disableOutputStreamFlush) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.position = position;
|
this.position = position;
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
this.supportFlush = supportFlush;
|
this.supportFlush = supportFlush;
|
||||||
|
this.disableOutputStreamFlush = disableOutputStreamFlush;
|
||||||
this.lastError = null;
|
this.lastError = null;
|
||||||
this.lastFlushOffset = 0;
|
this.lastFlushOffset = 0;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
@ -199,7 +202,7 @@ private void maybeThrowLastError() throws IOException {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
if (supportFlush) {
|
if (!disableOutputStreamFlush) {
|
||||||
flushInternalAsync();
|
flushInternalAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -643,6 +643,23 @@ Consult the javadocs for `org.apache.hadoop.fs.azurebfs.constants.ConfigurationK
|
|||||||
`org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list
|
`org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list
|
||||||
of configuration options and their default values.
|
of configuration options and their default values.
|
||||||
|
|
||||||
|
### <a name="flushconfigoptions"></a> Flush Options
|
||||||
|
|
||||||
|
#### <a name="abfsflushconfigoptions"></a> 1. Azure Blob File System Flush Options
|
||||||
|
Config `fs.azure.enable.flush` provides an option to render ABFS flush APIs -
|
||||||
|
HFlush() and HSync() to be no-op. By default, this
|
||||||
|
config will be set to true.
|
||||||
|
|
||||||
|
Both the APIs will ensure that data is persisted.
|
||||||
|
|
||||||
|
#### <a name="outputstreamflushconfigoptions"></a> 2. OutputStream Flush Options
|
||||||
|
Config `fs.azure.disable.outputstream.flush` provides an option to render
|
||||||
|
OutputStream Flush() API to be a no-op in AbfsOutputStream. By default, this
|
||||||
|
config will be set to true.
|
||||||
|
|
||||||
|
Hflush() being the only documented API that can provide persistent data
|
||||||
|
transfer, Flush() also attempting to persist buffered data will lead to
|
||||||
|
performance issues.
|
||||||
|
|
||||||
## <a name="troubleshooting"></a> Troubleshooting
|
## <a name="troubleshooting"></a> Troubleshooting
|
||||||
|
|
||||||
|
@ -208,43 +208,44 @@ public Void call() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFlushWithFlushEnabled() throws Exception {
|
public void testFlushWithOutputStreamFlushEnabled() throws Exception {
|
||||||
testFlush(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFlushWithFlushDisabled() throws Exception {
|
|
||||||
testFlush(false);
|
testFlush(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testFlush(boolean flushEnabled) throws Exception {
|
@Test
|
||||||
|
public void testFlushWithOutputStreamFlushDisabled() throws Exception {
|
||||||
|
testFlush(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testFlush(boolean disableOutputStreamFlush) throws Exception {
|
||||||
final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem();
|
final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem();
|
||||||
|
|
||||||
// Simulate setting "fs.azure.enable.flush" to true or false
|
// Simulate setting "fs.azure.disable.outputstream.flush" to true or false
|
||||||
fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(flushEnabled);
|
fs.getAbfsStore().getAbfsConfiguration()
|
||||||
|
.setDisableOutputStreamFlush(disableOutputStreamFlush);
|
||||||
|
|
||||||
final Path testFilePath = path(methodName.getMethodName());
|
final Path testFilePath = path(methodName.getMethodName());
|
||||||
byte[] buffer = getRandomBytesArray();
|
byte[] buffer = getRandomBytesArray();
|
||||||
|
|
||||||
// The test case must write "fs.azure.write.request.size" bytes
|
// The test case must write "fs.azure.write.request.size" bytes
|
||||||
// to the stream in order for the data to be uploaded to storage.
|
// to the stream in order for the data to be uploaded to storage.
|
||||||
assertEquals(
|
assertEquals(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(),
|
||||||
fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(),
|
|
||||||
buffer.length);
|
buffer.length);
|
||||||
|
|
||||||
try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
||||||
stream.write(buffer);
|
stream.write(buffer);
|
||||||
|
|
||||||
// Write asynchronously uploads data, so we must wait for completion
|
// Write asynchronously uploads data, so we must wait for completion
|
||||||
AbfsOutputStream abfsStream = (AbfsOutputStream) stream.getWrappedStream();
|
AbfsOutputStream abfsStream = (AbfsOutputStream) stream
|
||||||
|
.getWrappedStream();
|
||||||
abfsStream.waitForPendingUploads();
|
abfsStream.waitForPendingUploads();
|
||||||
|
|
||||||
// Flush commits the data so it can be read.
|
// Flush commits the data so it can be read.
|
||||||
stream.flush();
|
stream.flush();
|
||||||
|
|
||||||
// Verify that the data can be read if flushEnabled is true; and otherwise
|
// Verify that the data can be read if disableOutputStreamFlush is
|
||||||
// cannot be read.
|
// false; and otherwise cannot be read.
|
||||||
validate(fs.open(testFilePath), buffer, flushEnabled);
|
validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user