diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 19b1a906ec..723a25cadb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -146,6 +146,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_FLUSH) private boolean enableFlush; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH, + DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH) + private boolean disableOutputStreamFlush; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING, DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) private boolean enableAutoThrottling; @@ -427,6 +431,10 @@ public boolean isFlushEnabled() { return this.enableFlush; } + public boolean isOutputStreamFlushDisabled() { + return this.disableOutputStreamFlush; + } + public boolean isAutoThrottlingEnabled() { return this.enableAutoThrottling; } @@ -635,4 +643,10 @@ void setWriteBufferSize(int bufferSize) { void setEnableFlush(boolean enableFlush) { this.enableFlush = enableFlush; } + + @VisibleForTesting + void setDisableOutputStreamFlush(boolean disableOutputStreamFlush) { + this.disableOutputStreamFlush = disableOutputStreamFlush; + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 6b2d1966c1..7f1bf103c2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -362,7 +362,8 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled()); + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) @@ -434,7 +435,8 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), offset, abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled()); + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } public void rename(final Path source, final Path destination) throws diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 8cd86bf929..cd86f184fc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -51,7 +51,15 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; + /** Provides a config control to enable or disable ABFS Flush operations - + * HFlush and HSync. Default is true. **/ public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; + /** Provides a config control to disable or enable OutputStream Flush API + * operations in AbfsOutputStream. Flush() will trigger actions that + * guarantee that buffered data is persistent with a perf cost while the API + * documentation does not have such expectations of data being persisted. + * Default value of this config is true. **/ + public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush"; public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix"; public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode"; public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f0c33eebef..e0c355a07b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -57,6 +57,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final boolean DEFAULT_ENABLE_FLUSH = true; + public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index f2f0a45f4d..fd56eb0a01 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -52,6 +52,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private long position; private boolean closed; private boolean supportFlush; + private boolean disableOutputStreamFlush; private volatile IOException lastError; private long lastFlushOffset; @@ -80,12 +81,14 @@ public AbfsOutputStream( final String path, final long position, final int bufferSize, - final boolean supportFlush) { + final boolean supportFlush, + final boolean disableOutputStreamFlush) { this.client = client; this.path = path; this.position = position; this.closed = false; this.supportFlush = supportFlush; + this.disableOutputStreamFlush = disableOutputStreamFlush; this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; @@ -199,7 +202,7 @@ private void maybeThrowLastError() throws IOException { */ @Override public void flush() throws IOException { - if (supportFlush) { + if (!disableOutputStreamFlush) { flushInternalAsync(); } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 02a62c855f..c5bad77031 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -643,6 +643,23 @@ Consult the javadocs for `org.apache.hadoop.fs.azurebfs.constants.ConfigurationK `org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list of configuration options and their default values. +### Flush Options + +#### 1. Azure Blob File System Flush Options +Config `fs.azure.enable.flush` provides an option to render ABFS flush APIs - + HFlush() and HSync() to be no-op. By default, this +config will be set to true. + +Both the APIs will ensure that data is persisted. + +#### 2. OutputStream Flush Options +Config `fs.azure.disable.outputstream.flush` provides an option to render +OutputStream Flush() API to be a no-op in AbfsOutputStream. By default, this +config will be set to true. + +Hflush() being the only documented API that can provide persistent data +transfer, Flush() also attempting to persist buffered data will lead to +performance issues. ## Troubleshooting diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index d60cae868f..60f7f7d23f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -208,43 +208,44 @@ public Void call() throws Exception { } @Test - public void testFlushWithFlushEnabled() throws Exception { - testFlush(true); - } - - @Test - public void testFlushWithFlushDisabled() throws Exception { + public void testFlushWithOutputStreamFlushEnabled() throws Exception { testFlush(false); } - private void testFlush(boolean flushEnabled) throws Exception { + @Test + public void testFlushWithOutputStreamFlushDisabled() throws Exception { + testFlush(true); + } + + private void testFlush(boolean disableOutputStreamFlush) throws Exception { final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem(); - // Simulate setting "fs.azure.enable.flush" to true or false - fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(flushEnabled); + // Simulate setting "fs.azure.disable.outputstream.flush" to true or false + fs.getAbfsStore().getAbfsConfiguration() + .setDisableOutputStreamFlush(disableOutputStreamFlush); final Path testFilePath = path(methodName.getMethodName()); byte[] buffer = getRandomBytesArray(); // The test case must write "fs.azure.write.request.size" bytes // to the stream in order for the data to be uploaded to storage. - assertEquals( - fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(), + assertEquals(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(), buffer.length); try (FSDataOutputStream stream = fs.create(testFilePath)) { stream.write(buffer); // Write asynchronously uploads data, so we must wait for completion - AbfsOutputStream abfsStream = (AbfsOutputStream) stream.getWrappedStream(); + AbfsOutputStream abfsStream = (AbfsOutputStream) stream + .getWrappedStream(); abfsStream.waitForPendingUploads(); // Flush commits the data so it can be read. stream.flush(); - // Verify that the data can be read if flushEnabled is true; and otherwise - // cannot be read. - validate(fs.open(testFilePath), buffer, flushEnabled); + // Verify that the data can be read if disableOutputStreamFlush is + // false; and otherwise cannot be read. + validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush); } }