From bf6a660232b01642b07697a289c773ea5b97217c Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Mon, 6 Nov 2017 23:54:27 -0800 Subject: [PATCH] HADOOP-15012. Add readahead, dropbehind, and unbuffer to StreamCapabilities. Contributed by John Zhuge. --- .../apache/hadoop/fs/FSDataInputStream.java | 15 +++--- .../apache/hadoop/fs/StreamCapabilities.java | 48 ++++++++++++----- .../hadoop/fs/StreamCapabilitiesPolicy.java | 51 +++++++++++++++++++ .../site/markdown/filesystem/filesystem.md | 19 ++++--- .../apache/hadoop/hdfs/DFSInputStream.java | 16 +++++- .../apache/hadoop/hdfs/DFSOutputStream.java | 12 ++--- .../fs/azure/BlockBlobAppendStream.java | 17 ++++--- 7 files changed, 139 insertions(+), 39 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index a80279db52..08d71f16c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -38,7 +38,7 @@ public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { /** * Map ByteBuffers that we have handed out to readers to ByteBufferPool * objects @@ -227,12 +227,15 @@ public void releaseBuffer(ByteBuffer buffer) { @Override public void unbuffer() { - try { - ((CanUnbuffer)in).unbuffer(); - } catch (ClassCastException e) { - throw new UnsupportedOperationException("this stream " + - in.getClass().getName() + " does not " + "support unbuffering."); + StreamCapabilitiesPolicy.unbuffer(in); + } + + @Override + public boolean hasCapability(String capability) { + if (in instanceof StreamCapabilities) { + return ((StreamCapabilities) in).hasCapability(capability); } + return false; } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 65aa67988a..3549cdc4fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -23,27 +23,49 @@ /** * Interface to query streams for supported capabilities. + * + * Capability strings must be in lower case. + * + * Constant strings are chosen over enums in order to allow other file systems + * to define their own capabilities. */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface StreamCapabilities { + /** + * Stream hflush capability implemented by {@link Syncable#hflush()}. + */ + String HFLUSH = "hflush"; + + /** + * Stream hsync capability implemented by {@link Syncable#hsync()}. + */ + String HSYNC = "hsync"; + + /** + * Stream setReadahead capability implemented by + * {@link CanSetReadahead#setReadahead(Long)}. + */ + String READAHEAD = "in:readahead"; + + /** + * Stream setDropBehind capability implemented by + * {@link CanSetDropBehind#setDropBehind(Boolean)}. + */ + String DROPBEHIND = "dropbehind"; + + /** + * Stream unbuffer capability implemented by {@link CanUnbuffer#unbuffer()}. + */ + String UNBUFFER = "in:unbuffer"; + /** * Capabilities that a stream can support and be queried for. */ + @Deprecated enum StreamCapability { - /** - * Stream hflush capability to flush out the data in client's buffer. - * Streams with this capability implement {@link Syncable} and support - * {@link Syncable#hflush()}. - */ - HFLUSH("hflush"), - - /** - * Stream hsync capability to flush out the data in client's buffer and - * the disk device. Streams with this capability implement {@link Syncable} - * and support {@link Syncable#hsync()}. - */ - HSYNC("hsync"); + HFLUSH(StreamCapabilities.HFLUSH), + HSYNC(StreamCapabilities.HSYNC); private final String capability; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java new file mode 100644 index 0000000000..3080780dda --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Static methods to implement policies for {@link StreamCapabilities}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class StreamCapabilitiesPolicy { + /** + * Implement the policy for {@link CanUnbuffer#unbuffer()}. + * + * @param in the input stream + */ + public static void unbuffer(InputStream in) { + try { + if (in instanceof StreamCapabilities + && ((StreamCapabilities) in).hasCapability( + StreamCapabilities.UNBUFFER)) { + ((CanUnbuffer) in).unbuffer(); + } + } catch (ClassCastException e) { + throw new UnsupportedOperationException("this stream " + + in.getClass().getName() + + " claims to unbuffer but forgets to implement CanUnbuffer"); + } + } +} + diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index d25180486d..c0e2a2c66a 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -1359,7 +1359,8 @@ problems were not considered during the implementation of these loops. ## interface `StreamCapabilities` The `StreamCapabilities` provides a way to programmatically query the -capabilities that an `OutputStream` supports. +capabilities that `OutputStream`, `InputStream`, or other FileSystem class +supports. ```java public interface StreamCapabilities { @@ -1369,12 +1370,16 @@ public interface StreamCapabilities { ### `boolean hasCapability(capability)` -Return true if the `OutputStream` has the desired capability. +Return true if the `OutputStream`, `InputStream`, or other FileSystem class +has the desired capability. The caller can query the capabilities of a stream using a string value. -It currently supports to query: +Here is a table of possible string values: - * `StreamCapabilties.HFLUSH` ("*hflush*"): the capability to flush out the data - in client's buffer. - * `StreamCapabilities.HSYNC` ("*hsync*"): capability to flush out the data in - client's buffer and the disk device. +String | Constant | Implements | Description +-------------|------------|------------------|------------------------------- +hflush | HFLUSH | Syncable | Flush out the data in client's user buffer. After the return of this call, new readers will see the data. +hsync | HSYNC | Syncable | Flush out the data in client's user buffer all the way to the disk device (but the disk may have it in its cache). Similar to POSIX fsync. +in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input stream. +dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache. +in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e3d7adecba..d3d6669fca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -81,6 +82,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.StopWatch; +import org.apache.hadoop.util.StringUtils; import org.apache.htrace.core.SpanId; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; @@ -96,7 +98,7 @@ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; @@ -1779,4 +1781,16 @@ public synchronized void releaseBuffer(ByteBuffer buffer) { public synchronized void unbuffer() { closeCurrentBlockReaders(); } + + @Override + public boolean hasCapability(String capability) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.READAHEAD: + case StreamCapabilities.DROPBEHIND: + case StreamCapabilities.UNBUFFER: + return true; + default: + return false; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 83f1425a43..784979615f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -69,6 +66,7 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.htrace.core.TraceScope; import org.slf4j.Logger; @@ -552,11 +550,13 @@ void endBlock() throws IOException { @Override public boolean hasCapability(String capability) { - if (capability.equalsIgnoreCase(HSYNC.getValue()) || - capability.equalsIgnoreCase((HFLUSH.getValue()))) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: return true; + default: + return false; } - return false; } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index 84342cdab1..9a8530826e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.UUID; import java.util.Random; import java.util.concurrent.ConcurrentLinkedDeque; @@ -63,9 +64,6 @@ import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.BlockSearchMode; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC; - /** * Stream object that implements append for Block Blobs in WASB. * @@ -550,9 +548,16 @@ public void hflush() throws IOException { */ @Override public boolean hasCapability(String capability) { - return compactionEnabled - && (capability.equalsIgnoreCase(HSYNC.getValue()) - || capability.equalsIgnoreCase((HFLUSH.getValue()))); + if (!compactionEnabled) { + return false; + } + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return true; + default: + return false; + } } /**