diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java index 6e98755e77..591c2ec50d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java @@ -29,11 +29,13 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -52,7 +54,7 @@ * An output stream that write file data to a page blob stored using ASV's * custom format. */ -final class PageBlobOutputStream extends OutputStream implements Syncable { +final class PageBlobOutputStream extends OutputStream implements Syncable, StreamCapabilities { /** * The maximum number of raw bytes Azure Storage allows us to upload in a * single request (4 MB). @@ -195,6 +197,23 @@ private void checkStreamState() throws IOException { } } + /** + * Query the stream for a specific capability. + * + * @param capability string to query the stream support for. + * @return true for hsync and hflush. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return true; + default: + return false; + } + } + /** * Closes this output stream and releases any system resources associated with * this stream. If any data remains in the buffer it is committed to the diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java index 9ac1f73401..b8edc4b7d6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.hamcrest.core.IsEqual; import org.hamcrest.core.IsNot; import org.junit.Test; @@ -186,6 +187,20 @@ public void testPageBlobClose() throws IOException { } } + // Page Blobs have StreamCapabilities.HFLUSH and StreamCapabilities.HSYNC. + @Test + public void testPageBlobCapabilities() throws IOException { + Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); + assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); + assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); + assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); + assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + stream.write(getRandomBytes()); + } + } + // Verify flush does not write data to storage for Block Blobs @Test public void testBlockBlobFlush() throws Exception { @@ -265,6 +280,20 @@ public void testBlockBlobClose() throws IOException { } } + // Block Blobs do not have any StreamCapabilities. + @Test + public void testBlockBlobCapabilities() throws IOException { + Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH)); + assertFalse(stream.hasCapability(StreamCapabilities.HSYNC)); + assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); + assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); + assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + stream.write(getRandomBytes()); + } + } + // Verify flush writes data to storage for Block Blobs with compaction @Test public void testBlockBlobCompactionFlush() throws Exception { @@ -347,6 +376,20 @@ public void testBlockBlobCompactionClose() throws IOException { } } + // Block Blobs with Compaction have StreamCapabilities.HFLUSH and HSYNC. + @Test + public void testBlockBlobCompactionCapabilities() throws IOException { + Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); + assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); + assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); + assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); + assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + stream.write(getRandomBytes()); + } + } + // A small write does not write data to storage for Page Blobs @Test public void testPageBlobSmallWrite() throws IOException {