diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java index 553915d755..38c430fcd9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -312,10 +313,7 @@ private void freeBuffers() { @Override public boolean hasCapability(String capability) { - if (out instanceof StreamCapabilities) { - return ((StreamCapabilities) out).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(out, capability); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java index 0c5b4f0d37..59345f5d25 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.FileDescriptor; import java.io.IOException; +import java.util.StringJoiner; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -153,4 +154,12 @@ public boolean hasCapability(final String capability) { public IOStatistics getIOStatistics() { return retrieveIOStatistics(in); } + + @Override + public String toString() { + return new StringJoiner(", ", + BufferedFSInputStream.class.getSimpleName() + "[", "]") + .add("in=" + in) + .toString(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java index 2e2d98b9c5..0077838920 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java @@ -36,6 +36,6 @@ public interface CanSetDropBehind { * UnsupportedOperationException If this stream doesn't support * setting the drop-behind. */ - public void setDropBehind(Boolean dropCache) + void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index b24136bf9e..0256a58f46 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -46,6 +46,7 @@ import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; /**************************************************************** * Abstract Checksumed FileSystem. @@ -479,12 +480,15 @@ public IOStatistics getIOStatistics() { /** * Probe the inner stream for a capability. - * + * Syncable operations are rejected before being passed down. * @param capability string to query the stream support for. * @return true if a capability is known to be supported. */ @Override public boolean hasCapability(final String capability) { + if (isProbeForSyncable(capability)) { + return false; + } return datas.hasCapability(capability); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index b63e047358..b143a4cb63 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSupport; @@ -237,10 +238,7 @@ public void unbuffer() { @Override public boolean hasCapability(String capability) { - if (in instanceof StreamCapabilities) { - return ((StreamCapabilities) in).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(in, capability); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 27d164b7d8..add5d081e0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSupport; @@ -126,10 +127,7 @@ public OutputStream getWrappedStream() { @Override public boolean hasCapability(String capability) { - if (wrappedStream instanceof StreamCapabilities) { - return ((StreamCapabilities) wrappedStream).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(wrappedStream, capability); } @Override // Syncable diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java index 2458b2f40d..aaa19adf8c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java @@ -33,7 +33,8 @@ */ @InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceStability.Unstable -abstract public class FSOutputSummer extends OutputStream { +abstract public class FSOutputSummer extends OutputStream implements + StreamCapabilities { // data checksum private final DataChecksum sum; // internal buffer for storing data before it is checksumed @@ -254,4 +255,9 @@ protected synchronized void setChecksumBufSize(int size) { protected synchronized void resetChecksumBufSize() { setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS); } + + @Override + public boolean hasCapability(String capability) { + return false; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 2aeb17b90c..21c69b78ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -43,10 +43,12 @@ import java.util.Locale; import java.util.Optional; import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; @@ -137,8 +139,13 @@ class LocalFSFileInputStream extends FSInputStream implements STREAM_READ_SKIP_BYTES) .build(); + /** Reference to the bytes read counter for slightly faster counting. */ + private final AtomicLong bytesRead; + public LocalFSFileInputStream(Path f) throws IOException { fis = new FileInputStream(pathToFile(f)); + bytesRead = ioStatistics.getCounterReference( + STREAM_READ_BYTES); } @Override @@ -161,8 +168,8 @@ public boolean seekToNewSource(long targetPos) throws IOException { return false; } - /* - * Just forward to the fis + /** + * Just forward to the fis. */ @Override public int available() throws IOException { return fis.available(); } @@ -178,7 +185,7 @@ public int read() throws IOException { if (value >= 0) { this.position++; statistics.incrementBytesRead(1); - ioStatistics.incrementCounter(STREAM_READ_BYTES); + bytesRead.addAndGet(1); } return value; } catch (IOException e) { // unexpected exception @@ -196,7 +203,7 @@ public int read(byte[] b, int off, int len) throws IOException { if (value > 0) { this.position += value; statistics.incrementBytesRead(value); - ioStatistics.incrementCounter(STREAM_READ_BYTES, value); + bytesRead.addAndGet(value); } return value; } catch (IOException e) { // unexpected exception @@ -285,7 +292,7 @@ public FSDataInputStream open(PathHandle fd, int bufferSize) * For create()'s FSOutputStream. *********************************************************/ final class LocalFSFileOutputStream extends OutputStream implements - IOStatisticsSource, StreamCapabilities { + IOStatisticsSource, StreamCapabilities, Syncable { private FileOutputStream fos; /** @@ -354,6 +361,21 @@ public void write(int b) throws IOException { } } + @Override + public void hflush() throws IOException { + flush(); + } + + /** + * HSync calls sync on fhe file descriptor after a local flush() call. + * @throws IOException failure + */ + @Override + public void hsync() throws IOException { + flush(); + fos.getFD().sync(); + } + @Override public boolean hasCapability(String capability) { // a bit inefficient, but intended to make it easier to add @@ -362,7 +384,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS: return true; default: - return false; + return StoreImplementationUtils.isProbeForSyncable(capability); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 15ea2ab325..29af862f94 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -34,7 +34,11 @@ public interface StreamCapabilities { /** * Stream hflush capability implemented by {@link Syncable#hflush()}. + * + * Use the {@link #HSYNC} probe to check for the support of Syncable; + * it's that presence of {@code hsync()} which matters. */ + @Deprecated String HFLUSH = "hflush"; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java index 7ec3509ce1..9cd458592c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java @@ -23,20 +23,24 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -/** This interface for flush/sync operation. */ +/** + * This is the interface for flush/sync operations. + * Consult the Hadoop filesystem specification for the definition of the + * semantics of these operations. + */ @InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceStability.Stable public interface Syncable { - + /** Flush out the data in client's user buffer. After the return of * this call, new readers will see the data. * @throws IOException if any error occurs */ - public void hflush() throws IOException; - + void hflush() throws IOException; + /** Similar to posix fsync, flush out the data in client's user buffer * all the way to the disk device (but the disk may have it in its cache). * @throws IOException if error occurs */ - public void hsync() throws IOException; + void hsync() throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java new file mode 100644 index 0000000000..605a3538d8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.hadoop.fs.impl; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StreamCapabilities; + +import static org.apache.hadoop.fs.StreamCapabilities.HFLUSH; +import static org.apache.hadoop.fs.StreamCapabilities.HSYNC; + +/** + * Utility classes to help implementing filesystems and streams. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class StoreImplementationUtils { + + private StoreImplementationUtils() { + } + + /** + * Check the probe capability being for {@link StreamCapabilities#HSYNC} + * or {@link StreamCapabilities#HFLUSH} + * {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality. + * @param capability capability string. + * @return true if either refers to one of the Syncable operations. + */ + public static boolean isProbeForSyncable(String capability) { + return capability.equalsIgnoreCase(HSYNC) || + capability.equalsIgnoreCase(HFLUSH); + } + + /** + * Probe for an object having a capability; returns true + * if the stream implements {@link StreamCapabilities} and its + * {@code hasCapabilities()} method returns true for the capability. + * This is a package private method intended to provided a common + * implementation for input and output streams. + * {@link StreamCapabilities#hasCapability(String)} call is for public use. + * @param object object to probe. + * @param capability capability to probe for + * @return true if the object implements stream capabilities and + * declares that it supports the capability. + */ + static boolean objectHasCapability(Object object, String capability) { + if (object instanceof StreamCapabilities) { + return ((StreamCapabilities) object).hasCapability(capability); + } + return false; + } + + /** + * Probe for an output stream having a capability; returns true + * if the stream implements {@link StreamCapabilities} and its + * {@code hasCapabilities()} method returns true for the capability. + * @param out output stream + * @param capability capability to probe for + * @return true if the stream declares that it supports the capability. + */ + public static boolean hasCapability(OutputStream out, String capability) { + return objectHasCapability(out, capability); + } + + /** + * Probe for an input stream having a capability; returns true + * if the stream implements {@link StreamCapabilities} and its + * {@code hasCapabilities()} method returns true for the capability. + * @param in input stream + * @param capability capability to probe for + * @return true if the stream declares that it supports the capability. + */ + public static boolean hasCapability(InputStream in, String capability) { + return objectHasCapability(in, capability); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 035c4d3b4b..433212491b 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -664,11 +664,15 @@ For instance, HDFS may raise an `InvalidPathException`. result = FSDataOutputStream -The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`. +A zero byte file MUST exist at the end of the specified path, visible to all. + +The updated (valid) FileSystem MUST contain all the parent directories of the path, as created by `mkdirs(parent(p))`. The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of `FS.Files[p]` +The behavior of the returned stream is covered in [Output](outputstream.html). + #### Implementation Notes * Some implementations split the create into a check for the file existing @@ -677,10 +681,18 @@ The result is `FSDataOutputStream`, which through its operations may generate ne clients creating files with `overwrite==true` to fail if the file is created by another client between the two tests. -* S3A, Swift and potentially other Object Stores do not currently change the FS state +* S3A, Swift and potentially other Object Stores do not currently change the `FS` state until the output stream `close()` operation is completed. -This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`, - and potentially confuse file/directory logic +This is a significant difference between the behavior of object stores +and that of filesystems, as it allows >1 client to create a file with `overwrite=false`, +and potentially confuse file/directory logic. In particular, using `create()` to acquire +an exclusive lock on a file (whoever creates the file without an error is considered +the holder of the lock) may not not a safe algorithm to use when working with object stores. + +* Object stores may create an empty file as a marker when a file is created. +However, object stores with `overwrite=true` semantics may not implement this atomically, +so creating files with `overwrite=false` cannot be used as an implicit exclusion +mechanism between processes. * The Local FileSystem raises a `FileNotFoundException` when trying to create a file over a directory, hence it is listed as an exception that MAY be raised when @@ -692,6 +704,8 @@ this precondition fails. Make a `FSDataOutputStreamBuilder` to specify the parameters to create a file. +The behavior of the returned stream is covered in [Output](outputstream.html). + #### Implementation Notes `createFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make @@ -717,17 +731,21 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep #### Postconditions - FS + FS' = FS result = FSDataOutputStream Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]` by appending data to the existing list. +The behavior of the returned stream is covered in [Output](outputstream.html). + ### `FSDataOutputStreamBuilder appendFile(Path p)` Make a `FSDataOutputStreamBuilder` to specify the parameters to append to an existing file. +The behavior of the returned stream is covered in [Output](outputstream.html). + #### Implementation Notes `appendFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md index 25cc9d13d1..aba0a44c60 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md @@ -32,6 +32,7 @@ HDFS as these are commonly expected by Hadoop client applications. 1. [Notation](notation.html) 1. [Model](model.html) 1. [FileSystem class](filesystem.html) +1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html) 1. [FSDataInputStream class](fsdatainputstream.html) 1. [PathCapabilities interface](pathcapabilities.html) 1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md new file mode 100644 index 0000000000..33d9648c78 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -0,0 +1,1012 @@ + + + + +# Output: `OutputStream`, `Syncable` and `StreamCapabilities` + +## Introduction + +This document covers the Output Streams within the context of the +[Hadoop File System Specification](index.html). + +It uses the filesystem model defined in [A Model of a Hadoop Filesystem](model.html) +with the notation defined in [notation](Notation.md). + +The target audiences are: +1. Users of the APIs. While `java.io.OutputStream` is a standard interfaces, +this document clarifies how it is implemented in HDFS and elsewhere. +The Hadoop-specific interfaces `Syncable` and `StreamCapabilities` are new; +`Syncable` is notable in offering durability and visibility guarantees which +exceed that of `OutputStream`. +1. Implementors of File Systems and clients. + +## How data is written to a filesystem + +The core mechanism to write data to files through the Hadoop FileSystem APIs +is through `OutputStream` subclasses obtained through calls to +`FileSystem.create()`, `FileSystem.append()`, +or `FSDataOutputStreamBuilder.build()`. + +These all return instances of `FSDataOutputStream`, through which data +can be written through various `write()` methods. +After a stream's `close()` method is called, all data written to the +stream MUST BE persisted to the fileysystem and visible to oll other +clients attempting to read data from that path via `FileSystem.open()`. + +As well as operations to write the data, Hadoop's `OutputStream` implementations +provide methods to flush buffered data back to the filesystem, +so as to ensure that the data is reliably persisted and/or visible +to other callers. This is done via the `Syncable` interface. It was +originally intended that the presence of this interface could be interpreted +as a guarantee that the stream supported its methods. However, this has proven +impossible to guarantee as the static nature of the interface is incompatible +with filesystems whose syncability semantics may vary on a store/path basis. +As an example, erasure coded files in HDFS do not support the Sync operations, +even though they are implemented as subclass of an output stream which is `Syncable`. + +A new interface: `StreamCapabilities`. This allows callers +to probe the exact capabilities of a stream, even transitively +through a chain of streams. + +## Output Stream Model + +For this specification, an output stream can be viewed as a list of bytes +stored in the client; `hsync()` and `hflush()` are operations the actions +which propagate the data to be visible to other readers of the file and/or +made durable. + +```python +buffer: List[byte] +``` + +A flag, `open` tracks whether the stream is open: after the stream +is closed no more data may be written to it: + +```python +open: bool +buffer: List[byte] +``` + +The destination path of the stream, `path`, can be tracked to form a triple +`path, open, buffer` + +```python +Stream = (path: Path, open: Boolean, buffer: byte[]) +``` + +#### Visibility of Flushed Data + +(Immediately) after `Syncable` operations which flush data to the filesystem, +the data at the stream's destination path MUST match that of +`buffer`. That is, the following condition MUST hold: + +```python +FS'.Files(path) == buffer +``` + +Any client reading the data at the path MUST see the new data. +The `Syncable` operations differ in their durability +guarantees, not visibility of data. + +### State of Stream and File System after `Filesystem.create()` + +The output stream returned by a `FileSystem.create(path)` or +`FileSystem.createFile(path).build()` within a filesystem `FS`, +can be modeled as a triple containing an empty array of no data: + +```python +Stream' = (path, true, []) +``` + +The filesystem `FS'` MUST contain a 0-byte file at the path: + +```python +FS' = FS where data(FS', path) == [] +``` + +Thus, the initial state of `Stream'.buffer` is implicitly +consistent with the data at the filesystem. + + +*Object Stores*: see caveats in the "Object Stores" section below. + +### State of Stream and File System after `Filesystem.append()` + +The output stream returned from a call of + `FileSystem.append(path, buffersize, progress)` within a filesystem `FS`, +can be modelled as a stream whose `buffer` is intialized to that of +the original file: + +```python +Stream' = (path, true, data(FS, path)) +``` + +#### Persisting data + +When the stream writes data back to its store, be it in any +supported flush operation, in the `close()` operation, or at any other +time the stream chooses to do so, the contents of the file +are replaced with the current buffer + +```python +Stream' = (path, true, buffer) +FS' = FS where data(FS', path) == buffer +``` + +After a call to `close()`, the stream is closed for all operations other +than `close()`; they MAY fail with `IOException` or `RuntimeException`. + +```python +Stream' = (path, false, []) +``` + +The `close()` operation MUST be idempotent with the sole attempt to write the +data made in the first invocation. + +1. If `close()` succeeds, subsequent calls are no-ops. +1. If `close()` fails, again, subsequent calls are no-ops. They MAY rethrow +the previous exception, but they MUST NOT retry the write. + + + + + +## Class `FSDataOutputStream` + +```java +public class FSDataOutputStream + extends DataOutputStream + implements Syncable, CanSetDropBehind, StreamCapabilities { + // ... +} +``` + +The `FileSystem.create()`, `FileSystem.append()` and +`FSDataOutputStreamBuilder.build()` calls return an instance +of a class `FSDataOutputStream`, a subclass of `java.io.OutputStream`. + +The base class wraps an `OutputStream` instance, one which may implement `Syncable`, +`CanSetDropBehind` and `StreamCapabilities`. + +This document covers the requirements of such implementations. + +HDFS's `FileSystem` implementation, `DistributedFileSystem`, returns an instance +of `HdfsDataOutputStream`. This implementation has at least two behaviors +which are not explicitly declared by the base Java implmentation + +1. Writes are synchronized: more than one thread can write to the same +output stream. This is a use pattern which HBase relies on. + +1. `OutputStream.flush()` is a no-op when the file is closed. Apache Druid +has made such a call on this in the past +[HADOOP-14346](https://issues.apache.org/jira/browse/HADOOP-14346). + + +As the HDFS implementation is considered the de-facto specification of +the FileSystem APIs, the fact that `write()` is thread-safe is significant. + +For compatibility, not only SHOULD other FS clients be thread-safe, +but new HDFS features, such as encryption and Erasure Coding SHOULD also +implement consistent behavior with the core HDFS output stream. + +Put differently: + +*It isn't enough for Output Streams to implement the core semantics +of `java.io.OutputStream`: they need to implement the extra semantics +of `HdfsDataOutputStream`, especially for HBase to work correctly.* + +The concurrent `write()` call is the most significant tightening of +the Java specification. + +## Class `java.io.OutputStream` + +A Java `OutputStream` allows applications to write a sequence of bytes to a destination. +In a Hadoop filesystem, that destination is the data under a path in the filesystem. + +```java +public abstract class OutputStream implements Closeable, Flushable { + public abstract void write(int b) throws IOException; + public void write(byte b[]) throws IOException; + public void write(byte b[], int off, int len) throws IOException; + public void flush() throws IOException; + public void close() throws IOException; +} +``` +### `write(Stream, data)` + +Writes a byte of data to the stream. + +#### Preconditions + +```python +Stream.open else raise ClosedChannelException, PathIOException, IOException +``` + +The exception `java.nio.channels.ClosedChannelExceptionn` is +raised in the HDFS output streams when trying to write to a closed file. +This exception does not include the destination path; and +`Exception.getMessage()` is `null`. It is therefore of limited value in stack +traces. Implementors may wish to raise exceptions with more detail, such +as a `PathIOException`. + + +#### Postconditions + +The buffer has the lower 8 bits of the data argument appended to it. + +```python +Stream'.buffer = Stream.buffer + [data & 0xff] +``` + +There may be an explicit limit on the size of cached data, or an implicit +limit based by the available capacity of the destination filesystem. +When a limit is reached, `write()` SHOULD fail with an `IOException`. + +### `write(Stream, byte[] data, int offset, int len)` + + +#### Preconditions + +The preconditions are all defined in `OutputStream.write()` + +```python +Stream.open else raise ClosedChannelException, PathIOException, IOException +data != null else raise NullPointerException +offset >= 0 else raise IndexOutOfBoundsException +len >= 0 else raise IndexOutOfBoundsException +offset < data.length else raise IndexOutOfBoundsException +offset + len < data.length else raise IndexOutOfBoundsException +``` + +After the operation has returned, the buffer may be re-used. The outcome +of updates to the buffer while the `write()` operation is in progress is undefined. + +#### Postconditions + +```python +Stream'.buffer = Stream.buffer + data[offset...(offset + len)] +``` + +### `write(byte[] data)` + +This is defined as the equivalent of: + +```python +write(data, 0, data.length) +``` + +### `flush()` + +Requests that the data is flushed. The specification of `ObjectStream.flush()` +declares that this SHOULD write data to the "intended destination". + +It explicitly precludes any guarantees about durability. + +For that reason, this document doesn't provide any normative +specifications of behaviour. + +#### Preconditions + +None. + +#### Postconditions + +None. + +If the implementation chooses to implement a stream-flushing operation, +the data may be saved to the file system such that it becomes visible to +others" + +```python +FS' = FS where data(FS', path) == buffer +``` + +When a stream is closed, `flush()` SHOULD downgrade to being a no-op, if it was not +one already. This is to work with applications and libraries which can invoke +it in exactly this way. + + +*Issue*: Should `flush()` forward to `hflush()`? + +No. Or at least, make it optional. + +There's a lot of application code which assumes that `flush()` is low cost +and should be invoked after writing every single line of output, after +writing small 4KB blocks or similar. + +Forwarding this to a full flush across a distributed filesystem, or worse, +a distant object store, is very inefficient. +Filesystem clients which convert a `flush()` to an `hflush()` will eventually +have to roll back that feature: +[HADOOP-16548](https://issues.apache.org/jira/browse/HADOOP-16548). + +### `close()` + +The `close()` operation saves all data to the filesystem and +releases any resources used for writing data. + +The `close()` call is expected to block +until the write has completed (as with `Syncable.hflush()`), possibly +until it has been written to durable storage. + +After `close()` completes, the data in a file MUST be visible and consistent +with the data most recently written. The metadata of the file MUST be consistent +with the data and the write history itself (i.e. any modification time fields +updated). + +After `close()` is invoked, all subsequent `write()` calls on the stream +MUST fail with an `IOException`. + +Any locking/leaseholding mechanism MUST release its lock/lease. + +```python +Stream'.open = false +FS' = FS where data(FS', path) == buffer +``` + +The `close()` call MAY fail during its operation. + +1. Callers of the API MUST expect for some calls to `close()` to fail and SHOULD code appropriately. +Catching and swallowing exceptions, while common, is not always the ideal solution. +1. Even after a failure, `close()` MUST place the stream into a closed state. +Follow-on calls to `close()` are ignored, and calls to other methods +rejected. That is: caller's cannot be expected to call `close()` repeatedly +until it succeeds. +1. The duration of the `close()` operation is undefined. Operations which rely +on acknowledgements from remote systems to meet the persistence guarantees +implicitly have to await these acknowledgements. Some Object Store output streams +upload the entire data file in the `close()` operation. This can take a large amount +of time. The fact that many user applications assume that `close()` is both fast +and does not fail means that this behavior is dangerous. + +Recommendations for safe use by callers + +* Do plan for exceptions being raised, either in catching and logging or +by throwing the exception further up. Catching and silently swallowing exceptions +may hide serious problems. +* Heartbeat operations SHOULD take place on a separate thread, so that a long +delay in `close()` does not block the thread so long that the heartbeat times +out. + +Implementors: + +* Have a look at [HADOOP-16785](https://issues.apache.org/jira/browse/HADOOP-16785) +to see examples of complications in close. +* Incrementally writing blocks before a close operation results in a behavior which +matches client expectations better: write failures to surface earlier and close +to be more housekeeping than the actual upload. +* If block uploads are executed in separate threads, the output stream `close()` +call MUST block until all the asynchronous uploads have completed; any error raised +MUST be reported. +If multiple errors were raised, the stream can choose which to propagate. +What is important is: when `close()` returns without an error, applications expect +the data to have been successfully written. + +### HDFS and `OutputStream.close()` + +HDFS does not immediately `sync()` the output of a written file to disk on +`OutputStream.close()` unless configured with `dfs.datanode.synconclose` +is true. This has caused [problems in some applications](https://issues.apache.org/jira/browse/ACCUMULO-1364). + +Applications which absolutely require the guarantee that a file has been persisted +MUST call `Syncable.hsync()` *before* the file is closed. + + +## `org.apache.hadoop.fs.Syncable` + +```java +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface Syncable { + + + /** Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * @throws IOException if any error occurs + */ + void hflush() throws IOException; + + /** Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * @throws IOException if error occurs + */ + void hsync() throws IOException; +} +``` + +The purpose of `Syncable` interface is to provide guarantees that data is written +to a filesystem for both visibility and durability. + +*SYNC-1*: An `OutputStream` which implements `Syncable` and does not raise +`UnsupportedOperationException` on invocations is +making an explicit declaration that it can meet those guarantees. + +*SYNC-2*: If a stream, declares the interface as implemented, but does not +provide durability, the interface's methods MUST raise +`UnsupportedOperationException`. + +The `Syncable` interface has been implemented by other classes than +subclasses of `OutputStream`, such as `org.apache.hadoop.io.SequenceFile.Writer`. + +*SYNC-3* The fact that a class implements `Syncable` does not guarantee +that `extends OutputStream` holds. + +That is, for any class `C`: `(C instanceof Syncable)` does not imply +`(C instanceof OutputStream)` + +This specification only covers the required behavior of `OutputStream` subclasses +which implement `Syncable`. + + +*SYNC-4:* The return value of `FileSystem.create(Path)` is an instance +of `FSDataOutputStream`. + +*SYNC-5:* `FSDataOutputStream implements Syncable` + + +SYNC-5 and SYNC-1 imply that all output streams which can be created +with `FileSystem.create(Path)` must support the semantics of `Syncable`. +This is demonstrably not true: `FSDataOutputStream` simply downgrades +to a `flush()` if its wrapped stream is not `Syncable`. +Therefore the declarations SYNC-1 and SYNC-2 do not hold: you cannot trust `Syncable`. + +Put differently: *callers MUST NOT rely on the presence of the interface +as evidence that the semantics of `Syncable` are supported*. Instead +they MUST be dynamically probed for using the `StreamCapabilities` +interface, where available. + + +### `Syncable.hflush()` + +Flush out the data in client's user buffer. After the return of +this call, new readers will see the data. The `hflush()` operation +does not contain any guarantees as to the durability of the data. only +its visibility. + +Thus implementations may cache the written data in memory +—visible to all, but not yet persisted. + +#### Preconditions + +```python +hasCapability(Stream, "hflush") +Stream.open else raise IOException +``` + + +#### Postconditions + +```python +FS' = FS where data(path) == cache +``` + + +After the call returns, the data MUST be visible to all new callers +of `FileSystem.open(path)` and `FileSystem.openFile(path).build()`. + +There is no requirement or guarantee that clients with an existing +`DataInputStream` created by a call to `(FS, path)` will see the updated +data, nor is there a guarantee that they *will not* in a current or subsequent +read. + +Implementation note: as a correct `hsync()` implementation MUST also +offer all the semantics of an `hflush()` call, implementations of `hflush()` +may just invoke `hsync()`: + +```java +public void hflush() throws IOException { + hsync(); +} +``` + +#### `hflush()` Performance + +The `hflush()` call MUST block until the store has acknowledge that the +data has been received and is now visible to others. This can be slow, +as it will include the time to upload any outstanding data from the +client, and for the filesystem itself to process it. + +Often Filesystems only offer the `Syncable.hsync()` guarantees: persistence as +well as visibility. This means the time to return can be even greater. + +Application code MUST NOT call `hflush()` or `hsync()` at the end of every line +or, unless they are writing a WAL, at the end of every record. Use with care. + + +### `Syncable.hsync()` + +Similar to POSIX `fsync()`, this call saves the data in client's user buffer +all the way to the disk device (but the disk may have it in its cache). + +That is: it is a requirement for the underlying FS To save all the data to +the disk hardware itself, where it is expected to be durable. + +#### Preconditions + +```python +hasCapability(Stream, "hsync") +Stream.open else raise IOException +``` + +#### Postconditions + +```python +FS' = FS where data(path) == buffer +``` + +_Implementations are required to block until that write has been +acknowledged by the store._ + +This is so the caller can be confident that once the call has +returned successfully, the data has been written. + + + +## Interface `StreamCapabilities` + +```java +@InterfaceAudience.Public +@InterfaceStability.Evolving +``` + +The `org.apache.hadoop.fs.StreamCapabilities` interface exists to allow callers to dynamically +determine the behavior of a stream. + +```java + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; + } + } +``` + +Once a stream has been closed, a `hasCapability()` call MUST do one of + +* return the capabilities of the open stream. +* return false. + +That is: it MUST NOT raise an exception about the file being closed; + +See [pathcapabilities](pathcapabilities.html) for specifics on the `PathCapabilities` API; +the requirements are similar: a stream MUST NOT return true for a capability +for which it lacks support, be it because + +* The capability is unknown. +* The capability is known and known to be unsupported. + +Standard stream capabilities are defined in `StreamCapabilities`; +consult the javadocs for the complete set of options. + +| Name | Probes for support of | +|-------|---------| +| `dropbehind` | `CanSetDropBehind.setDropBehind()` | +| `hsync` | `Syncable.hsync()` | +| `hflush` | `Syncable.hflush()`. Deprecated: probe for `HSYNC` only. | +| `in:readahead` | `CanSetReadahead.setReadahead()` | +| `in:unbuffer"` | `CanUnbuffer.unbuffer()` | +| `in:readbytebuffer` | `ByteBufferReadable#read(ByteBuffer)` | +| `in:preadbytebuffer` | `ByteBufferPositionedReadable#read(long, ByteBuffer)` | + +Stream implementations MAY add their own custom options. +These MUST be prefixed with `fs.SCHEMA.`, where `SCHEMA` is the schema of the filesystem. + +## interface `CanSetDropBehind` + +```java +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface CanSetDropBehind { + /** + * Configure whether the stream should drop the cache. + * + * @param dropCache Whether to drop the cache. null means to use the + * default value. + * @throws IOException If there was an error changing the dropBehind + * setting. + * UnsupportedOperationException If this stream doesn't support + * setting the drop-behind. + */ + void setDropBehind(Boolean dropCache) + throws IOException, UnsupportedOperationException; +} +``` + +This interface allows callers to change policies used inside HDFS. + +Implementations MUST return `true` for the call + +```java +StreamCapabilities.hasCapability("dropbehind"); +``` + + +## Durability, Concurrency, Consistency and Visibility of stream output. + +These are the aspects of the system behaviour which are not directly +covered in this (very simplistic) filesystem model, but which are visible +in production. + + +### Durability + +1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously +1. `OutputStream.flush()` flushes data to the destination. There +are no strict persistence requirements. +1. `Syncable.hflush()` synchronously sends all outstaning data to the destination +filesystem. After returning to the caller, the data MUST be visible to other readers, +it MAY be durable. That is: it does not have to be persisted, merely guaranteed +to be consistently visible to all clients attempting to open a new stream reading +data at the path. +1. `Syncable.hsync()` MUST transmit the data as per `hflush` and persist + that data to the underlying durable storage. +1. `close()` The first call to `close()` MUST flush out all remaining data in +the buffers, and persist it, as a call to `hsync()`. + + +Many applications call `flush()` far too often -such as at the end of every line written. +If this triggered an update of the data in persistent storage and any accompanying +metadata, distributed stores would overload fast. +Thus: `flush()` is often treated at most as a cue to flush data to the network +buffers -but not commit to writing any data. + +It is only the `Syncable` interface which offers guarantees. + +The two `Syncable` operations `hsync()` and `hflush()` differ purely by the extra guarantee of `hsync()`: the data must be persisted. +If `hsync()` is implemented, then `hflush()` can be implemented simply +by invoking `hsync()` + +```java +public void hflush() throws IOException { + hsync(); +} +``` + +This is perfectly acceptable as an implementation: the semantics of `hflush()` +are satisifed. +What is not acceptable is downgrading `hsync()` to `hflush()`, as the durability guarantee is no longer met. + + +### Concurrency + +1. The outcome of more than one process writing to the same file is undefined. + +1. An input stream opened to read a file *before the file was opened for writing* +MAY fetch data updated by writes to an OutputStream. +Because of buffering and caching, this is not a requirement +—and if an input stream does pick up updated data, the point at +which the updated data is read is undefined. This surfaces in object stores +where a `seek()` call which closes and re-opens the connection may pick up +updated data, while forward stream reads do not. Similarly, in block-oriented +filesystems, the data may be cached a block at a time —and changes only picked +up when a different block is read. + +1. A filesystem MAY allow the destination path to be manipulated while a stream +is writing to it —for example, `rename()` of the path or a parent; `delete()` of +a path or parent. In such a case, the outcome of future write operations on +the output stream is undefined. Some filesystems MAY implement locking to +prevent conflict. However, this tends to be rare on distributed filesystems, +for reasons well known in the literature. + +1. The Java API specification of `java.io.OutputStream` does not require +an instance of the class to be thread safe. +However, `org.apache.hadoop.hdfs.DFSOutputStream` +has a stronger thread safety model (possibly unintentionally). This fact is +relied upon in Apache HBase, as discovered in HADOOP-11708. Implementations +SHOULD be thread safe. *Note*: even the `DFSOutputStream` synchronization +model permits the output stream to have `close()` invoked while awaiting an +acknowledgement from datanode or namenode writes in an `hsync()` operation. + +### Consistency and Visibility + +There is no requirement for the data to be immediately visible to other applications +—not until a specific call to flush buffers or persist it to the underlying storage +medium are made. + +If an output stream is created with `FileSystem.create(path, overwrite==true)` +and there is an existing file at the path, that is `exists(FS, path)` holds, +then, the existing data is immediately unavailable; the data at the end of the +path MUST consist of an empty byte sequence `[]`, with consistent metadata. + + +```python +exists(FS, path) +(Stream', FS') = create(FS, path) +exists(FS', path) +getFileStatus(FS', path).getLen() = 0 +``` + +The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent +with the contents of the file after `flush()` and `sync()`. + +```python +(Stream', FS') = create(FS, path) +(Stream'', FS'') = write(Stream', data) +(Stream''', FS''') hsync(Stream'') +exists(FS''', path) +getFileStatus(FS''', path).getLen() = len(data) +``` + +*HDFS does not do this except when the write crosses a block boundary*; to do +otherwise would overload the Namenode. Other stores MAY copy this behavior. + +As a result, while a file is being written +`length(Filesystem, Path)` MAY be less than the length of `data(Filesystem, Path)`. + +The metadata MUST be consistent with the contents of a file after the `close()` +operation. + +After the contents of an output stream have been persisted (`hflush()/hsync()`) +all new `open(FS, Path)` operations MUST return the updated data. + +After `close()` has been invoked on an output stream, +a call to `getFileStatus(path)` MUST return the final metadata of the written file, +including length and modification time. +The metadata of the file returned in any of the FileSystem `list` operations +MUST be consistent with this metadata. + +The value of `getFileStatus(path).getModificationTime()` is not defined +while a stream is being written to. +The timestamp MAY be updated while a file is being written, +especially after a `Syncable.hsync()` call. +The timestamps MUST be updated after the file is closed +to that of a clock value observed by the server during the `close()` call. +It is *likely* to be in the time and time zone of the filesystem, rather +than that of the client. + +Formally, if a `close()` operation triggers an interaction with a server +which starts at server-side time `t1` and completes at time `t2` with a successfully +written file, then the last modification time SHOULD be a time `t` where +`t1 <= t <= t2` + +## Issues with the Hadoop Output Stream model. + +There are some known issues with the output stream model as offered by Hadoop, +specifically about the guarantees about when data is written and persisted +—and when the metadata is synchronized. +These are where implementation aspects of HDFS and the "Local" filesystem +do not follow the simple model of the filesystem used in this specification. + +### HDFS + +#### HDFS: `hsync()` only syncs the latest block + +The reference implementation, `DFSOutputStream` will block until an +acknowledgement is received from the datanodes: that is, all hosts in the +replica write chain have successfully written the file. + +That means that the expectation callers may have is that the return of the +method call contains visibility and durability guarantees which other +implementations must maintain. + +Note, however, that the reference `DFSOutputStream.hsync()` call only actually +persists *the current block*. If there have been a series of writes since the +last sync, such that a block boundary has been crossed. The `hsync()` call +claims only to write the most recent. + +From the javadocs of `DFSOutputStream.hsync(EnumSet syncFlags)` + +> Note that only the current block is flushed to the disk device. +> To guarantee durable sync across block boundaries the stream should +> be created with {@link CreateFlag#SYNC_BLOCK}. + + +This is an important HDFS implementation detail which must not be ignored by +anyone relying on HDFS to provide a Write-Ahead-Log or other database structure +where the requirement of the application is that +"all preceeding bytes MUST have been persisted before the commit flag in the WAL +is flushed" + +See [Stonebraker81], Michael Stonebraker, _Operating System Support for Database Management_, +1981, for a discussion on this topic. + +If you do need `hsync()` to have synced every block in a very large write, call +it regularly. + +#### HDFS: delayed visibility of metadata updates. + +That HDFS file metadata often lags the content of a file being written +to is not something everyone expects, nor convenient for any program trying +to pick up updated data in a file being written. Most visible is the length +of a file returned in the various `list` commands and `getFileStatus` —this +is often out of date. + +As HDFS only supports file growth in its output operations, this means +that the size of the file as listed in the metadata may be less than or equal +to the number of available bytes —but never larger. This is a guarantee which +is also held + +One algorithm to determine whether a file in HDFS is updated is: + +1. Remember the last read position `pos` in the file, using `0` if this is the initial +read. +1. Use `getFileStatus(FS, Path)` to query the updated length of the file as +recorded in the metadata. +1. If `Status.length > pos`, the file has grown. +1. If the number has not changed, then + 1. Reopen the file. + 1. `seek(pos)` to that location + 1. If `read() != -1`, there is new data. + +This algorithm works for filesystems which are consistent with metadata and +data, as well as HDFS. What is important to know is that, for an open file +`getFileStatus(FS, path).getLen() == 0` does not imply that `data(FS, path)` is +empty. + +When an output stream in HDFS is closed; the newly written data is not immediately +written to disk unless HDFS is deployed with `dfs.datanode.synconclose` set to +true. Otherwise it is cached and written to disk later. + +### Local Filesystem, `file:` + +`LocalFileSystem`, `file:`, (or any other `FileSystem` implementation based on +`ChecksumFileSystem`) has a different issue. If an output stream +is obtained from `create()` and `FileSystem.setWriteChecksum(false)` has +*not* been called on the filesystem, then the stream only flushes as much +local data as can be written to full checksummed blocks of data. + +That is, the hsync/hflush operations are not guaranteed to write all the pending +data until the file is finally closed. + +For this reason, the local fileystem accessed via `file://` URLs +does not support `Syncable` unless `setWriteChecksum(false)` was +called on that FileSystem instance so as to disable checksum creation. +After which, obviously, checksums are not generated for any file. +Is +### Checksummed output streams + +Because `org.apache.hadoop.fs.FSOutputSummer` and +`org.apache.hadoop.fs.ChecksumFileSystem.ChecksumFSOutputSummer` +implement the underlying checksummed output stream used by HDFS and +other filesystems, it provides some of the core semantics of the output +stream behavior. + +1. The `close()` call is unsynchronized, re-entrant and may attempt +to close the stream more than once. +1. It is possible to call `write(int)` on a closed stream (but not +`write(byte[], int, int)`). +1. It is possible to call `flush()` on a closed stream. + +Behaviors 1 and 2 really have to be considered bugs to fix, albeit with care. + +Behavior 3 has to be considered a defacto standard, for other implementations +to copy. + +### Object Stores + +Object store streams MAY buffer the entire stream's output +until the final `close()` operation triggers a single `PUT` of the data +and materialization of the final output. + +This significantly change's their behaviour compared to that of +POSIX filesystems and that specified in this document. + +#### Visibility of newly created objects + +There is no guarantee that any file will be visible at the path of an output +stream after the output stream is created . + +That is: while `create(FS, path, boolean)` returns a new stream + +```python +Stream' = (path, true, []) +``` + +The other postcondition of the operation, `data(FS', path) == []` MAY NOT +hold, in which case: + +1. `exists(FS, p)` MAY return false. +1. If a file was created with `overwrite = True`, the existing data MAY still +be visible: `data(FS', path) = data(FS, path)`. + +1. The check for existing data in a `create()` call with `overwrite=False`, may +take place in the `create()` call itself, in the `close()` call prior to/during +the write, or at some point in between. In the special case that the +object store supports an atomic `PUT` operation, the check for existence of +existing data and the subsequent creation of data at the path contains a race +condition: other clients may create data at the path between the existence check +and the subsequent write. + +1. Calls to `create(FS, Path, overwrite=false)` MAY succeed, returning a new +`OutputStream`, even while another stream is open and writing to the destination +path. + +This allows for the following sequence of operations, which would +raise an exception in the second `open()` call if invoked against HDFS: + +```python +Stream1 = open(FS, path, false) +sleep(200) +Stream2 = open(FS, path, false) +Stream.write('a') +Stream1.close() +Stream2.close() +``` + +For anyone wondering why the clients don't create a 0-byte file in the `create()` call, +it would cause problems after `close()` —the marker file could get +returned in `open()` calls instead of the final data. + +#### Visibility of the output of a stream after `close()` + +One guarantee which Object Stores SHOULD make is the same as those of POSIX +filesystems: After a stream `close()` call returns, the data MUST be persisted +durably and visible to all callers. Unfortunately, even that guarantee is +not always met: + +1. Existing data on a path MAY be visible for an indeterminate period of time. + +1. If the store has any form of create inconsistency or buffering of negative +existence probes, then even after the stream's `close()` operation has returned, +`getFileStatus(FS, path)` and `open(FS, path)` may fail with a `FileNotFoundException`. + +In their favour, the atomicity of the store's PUT operations do offer their +own guarantee: a newly created object is either absent or all of its data +is present: the act of instantiating the object, while potentially exhibiting +create inconsistency, is atomic. Applications may be able to use that fact +to their advantage. + +## Implementors notes. + +### Always implement `Syncable` -even if just to throw `UnsupportedOperationException` + +Because `FSDataOutputStream` silently downgrades `Syncable.hflush()` +and `Syncable.hsync()` to `wrappedStream.flush()`, callers of the +API MAY be misled into believing that their data has been flushed/synced +after syncing to a stream which does not support the APIs. + +Implementations SHOULD implement the API but +throw `UnsupportedOperationException`. + +### `StreamCapabilities` + +Implementors of filesystem clients SHOULD implement the `StreamCapabilities` +interface and its `hasCapabilities()` method to to declare whether or not +an output streams offer the visibility and durability guarantees of `Syncable`. + +Implementors of `StreamCapabilities.hasCapabilities()` MUST NOT declare that +they support the `hflush` and `hsync` capabilities on streams where this is not true. + +Sometimes streams pass their data to store, but the far end may not +sync it all the way to disk. That is not something the client can determine. +Here: if the client code is making the hflush/hsync passes these requests +on to the distributed FS, it SHOULD declare that it supports them. + +### Metadata updates + +Implementors MAY NOT update a file's metadata (length, date, ...) after +every `hsync()` call. HDFS doesn't, except when the written data crosses +a block boundary. + + + +### Does `close()` synchronize and persist data? + +By default, HDFS does not immediately data to disk when a stream is closed; it will +be asynchronously saved to disk. + +This does not mean that users do not expect it. + +The behavior as implemented is similar to the write-back aspect's of NFS's +[caching](https://docstore.mik.ua/orelly/networking_2ndEd/nfs/ch07_04.htm). +`DFSClient.close()` is performing an `hflush()` to the client to upload +all data to the datanodes. + +1. `close()` SHALL return once the guarantees of `hflush()` are met: the data is + visible to others. +1. For durability guarantees, `hsync()` MUST be called first. \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java index 79222ce67d..85e1f84999 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java @@ -18,23 +18,31 @@ package org.apache.hadoop.fs.contract; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; + import org.junit.Test; import org.junit.AssumptionViolatedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** * Test creating files, overwrite options etc. @@ -42,6 +50,9 @@ public abstract class AbstractContractCreateTest extends AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractCreateTest.class); + /** * How long to wait for a path to become visible. */ @@ -436,4 +447,145 @@ private void createFile(Path path) throws IOException { writeDataset(fs, path, data, data.length, 1024 * 1024, true); } + + @Test + public void testSyncable() throws Throwable { + describe("test declared and actual Syncable behaviors"); + FileSystem fs = getFileSystem(); + boolean supportsFlush = isSupported(SUPPORTS_HFLUSH); + boolean supportsSync = isSupported(SUPPORTS_HSYNC); + boolean metadataUpdatedOnHSync = isSupported(METADATA_UPDATED_ON_HSYNC); + + validateSyncableSemantics(fs, + supportsSync, + supportsFlush, + metadataUpdatedOnHSync); + } + + /** + * Validate the semantics of syncable. + * @param fs filesystem + * @param supportsSync sync is present + * @param supportsFlush flush is present. + * @param metadataUpdatedOnHSync Is the metadata updated after an hsync? + * @throws IOException failure + */ + protected void validateSyncableSemantics(final FileSystem fs, + final boolean supportsSync, + final boolean supportsFlush, + final boolean metadataUpdatedOnHSync) + throws IOException { + Path path = methodPath(); + LOG.info("Expecting files under {} to have supportsSync={}" + + " and supportsFlush={}; metadataUpdatedOnHSync={}", + path, supportsSync, supportsFlush, metadataUpdatedOnHSync); + + try (FSDataOutputStream out = fs.create(path, true)) { + LOG.info("Created output stream {}", out); + + // probe stream for support for flush/sync, whose capabilities + // of supports/does not support must match what is expected + String[] hflushCapabilities = { + StreamCapabilities.HFLUSH + }; + String[] hsyncCapabilities = { + StreamCapabilities.HSYNC + }; + if (supportsFlush) { + assertCapabilities(out, hflushCapabilities, null); + } else { + assertCapabilities(out, null, hflushCapabilities); + } + if (supportsSync) { + assertCapabilities(out, hsyncCapabilities, null); + } else { + assertCapabilities(out, null, hsyncCapabilities); + } + + // write one byte, then hflush it + out.write('a'); + try { + out.hflush(); + if (!supportsFlush) { + // FSDataOutputStream silently downgrades to flush() here. + // This is not good, but if changed some applications + // break writing to some stores. + LOG.warn("FS doesn't support Syncable.hflush()," + + " but doesn't reject it either."); + } + } catch (UnsupportedOperationException e) { + if (supportsFlush) { + throw new AssertionError("hflush not supported", e); + } + } + + // write a second byte, then hsync it. + out.write('b'); + try { + out.hsync(); + } catch (UnsupportedOperationException e) { + if (supportsSync) { + throw new AssertionError("HSync not supported", e); + } + } + + if (supportsSync) { + // if sync really worked, data MUST be visible here + + // first the metadata which MUST be present + final FileStatus st = fs.getFileStatus(path); + if (metadataUpdatedOnHSync) { + // not all stores reliably update it, HDFS/webHDFS in particular + assertEquals("Metadata not updated during write " + st, + 2, st.getLen()); + } + + // there's no way to verify durability, but we can + // at least verify a new file input stream reads + // the data + try (FSDataInputStream in = fs.open(path)) { + assertEquals('a', in.read()); + assertEquals('b', in.read()); + assertEquals(-1, in.read()); + LOG.info("Successfully read synced data on a new reader {}", in); + } + } else { + // no sync. Let's do a flush and see what happens. + out.flush(); + // Now look at the filesystem. + try (FSDataInputStream in = fs.open(path)) { + int c = in.read(); + if (c == -1) { + // nothing was synced; sync and flush really aren't there. + LOG.info("sync and flush are declared unsupported" + + " -flushed changes were not saved"); + + } else { + LOG.info("sync and flush are declared unsupported" + + " - but the stream does offer some sync/flush semantics"); + } + // close outside a finally as we do want to see any exception raised. + in.close(); + + } catch (FileNotFoundException e) { + // that's OK if it's an object store, but not if its a real + // FS + if (!isSupported(IS_BLOBSTORE)) { + throw e; + } else { + LOG.warn( + "Output file was not created; this is an object store with different" + + " visibility semantics"); + } + } + } + // close the output stream + out.close(); + + final String stats = ioStatisticsSourceToString(out); + if (!stats.isEmpty()) { + LOG.info("IOStatistics {}", stats); + } + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java index 3f31c07742..29cd29dfaf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java @@ -241,4 +241,19 @@ public interface ContractOptions { */ String TEST_RANDOM_SEEK_COUNT = "test.random-seek-count"; + /** + * Is hflush supported in API and StreamCapabilities? + */ + String SUPPORTS_HFLUSH = "supports-hflush"; + + /** + * Is hsync supported in API and StreamCapabilities? + */ + String SUPPORTS_HSYNC = "supports-hsync"; + + /** + * Is the metadata updated after an hsync? + * HDFS does not do this. + */ + String METADATA_UPDATED_ON_HSYNC = "metadata_updated_on_hsync"; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 39a41d01c4..c8cf19758f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1542,19 +1542,49 @@ public static void assertCapabilities( StreamCapabilities source = (StreamCapabilities) stream; if (shouldHaveCapabilities != null) { for (String shouldHaveCapability : shouldHaveCapabilities) { - assertTrue("Should have capability: " + shouldHaveCapability, + assertTrue("Should have capability: " + shouldHaveCapability + + " in " + source, source.hasCapability(shouldHaveCapability)); } } if (shouldNotHaveCapabilities != null) { for (String shouldNotHaveCapability : shouldNotHaveCapabilities) { - assertFalse("Should not have capability: " + shouldNotHaveCapability, + assertFalse("Should not have capability: " + shouldNotHaveCapability + + " in " + source, source.hasCapability(shouldNotHaveCapability)); } } } + + /** + * Custom assert to verify capabilities supported by + * an object through {@link StreamCapabilities}. + * + * @param source The object to test for StreamCapabilities + * @param capabilities The list of expected capabilities + */ + public static void assertHasStreamCapabilities( + final Object source, + final String... capabilities) { + assertCapabilities(source, capabilities, null); + } + + /** + * Custom assert to verify capabilities NOT supported by + * an object through {@link StreamCapabilities}. + * + * @param source The object to test for StreamCapabilities + * @param capabilities The list of capabilities which must not be + * supported. + */ + public static void assertLacksStreamCapabilities( + final Object source, + final String... capabilities) { + assertCapabilities(source, null, capabilities); + } + /** * Custom assert to test {@link PathCapabilities}. * @@ -1569,7 +1599,8 @@ public static void assertHasPathCapabilities( for (String shouldHaveCapability: capabilities) { assertTrue("Should have capability: " + shouldHaveCapability - + " under " + path, + + " under " + path + + " in " + source, source.hasPathCapability(path, shouldHaveCapability)); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractCreate.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractCreate.java index f8eeb961e9..3cea68c221 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractCreate.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractCreate.java @@ -18,7 +18,10 @@ package org.apache.hadoop.fs.contract.localfs; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.contract.AbstractContractCreateTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -29,4 +32,17 @@ protected AbstractFSContract createContract(Configuration conf) { return new LocalFSContract(conf); } + @Test + public void testSyncablePassthroughIfChecksumDisabled() throws Throwable { + describe("Create an instance of the local fs, disable the checksum" + + " and verify that Syncable now works"); + LocalFileSystem fs = (LocalFileSystem) getFileSystem(); + try (LocalFileSystem lfs = new LocalFileSystem( + fs.getRawFileSystem())) { + // disable checksumming output + lfs.setWriteChecksum(false); + // now the filesystem supports Sync with immediate update of file status + validateSyncableSemantics(lfs, true, true, true); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml index b261a63be7..03bb3e800f 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml @@ -121,4 +121,14 @@ case sensitivity and permission options are determined at run time from OS type true + + fs.contract.supports-settimes + true + + + + fs.contract.supports-getfilestatus + true + + diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml index 8cbd4a0abc..198ca566e2 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml @@ -127,4 +127,19 @@ true + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + + + fs.contract.metadata_updated_on_hsync + true + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e85e2a299d..9003e51139 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -66,7 +67,6 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.htrace.core.TraceScope; import org.slf4j.Logger; @@ -560,13 +560,7 @@ void endBlock() throws IOException { @Override public boolean hasCapability(String capability) { - switch (StringUtils.toLowerCase(capability)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return true; - default: - return false; - } + return StoreImplementationUtils.isProbeForSyncable(capability); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml index 3c9fcccc73..28721f7574 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml @@ -116,4 +116,19 @@ true + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + + + fs.contract.metadata_updated_on_hsync + false + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 5784ab8615..9a1a9403bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -37,6 +37,8 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.UploadPartRequest; + +import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; @@ -70,15 +72,20 @@ * is instead done as a single PUT operation. * * Unstable: statistics and error handling might evolve. + * + * Syncable is declared as supported so the calls can be + * explicitly rejected. */ @InterfaceAudience.Private @InterfaceStability.Unstable class S3ABlockOutputStream extends OutputStream implements - StreamCapabilities, IOStatisticsSource { + StreamCapabilities, IOStatisticsSource, Syncable { private static final Logger LOG = LoggerFactory.getLogger(S3ABlockOutputStream.class); + private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable"; + /** Owner FileSystem. */ private final S3AFileSystem fs; @@ -546,6 +553,16 @@ public boolean hasCapability(String capability) { } } + @Override + public void hflush() throws IOException { + throw new UnsupportedOperationException(E_NOT_SYNCABLE); + } + + @Override + public void hsync() throws IOException { + throw new UnsupportedOperationException(E_NOT_SYNCABLE); + } + @Override public IOStatistics getIOStatistics() { return iostatistics; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 0e8d864d4c..358ec261cc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -116,5 +116,5 @@ private InternalConstants() { * problems related to region/endpoint setup, it is currently * disabled. */ - public static final boolean AWS_SDK_METRICS_ENABLED = false; + public static final boolean AWS_SDK_METRICS_ENABLED = true; } diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java index 2b89fb0a73..dd4495319d 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import java.io.IOException; @@ -42,7 +44,8 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class AdlFsOutputStream extends OutputStream implements Syncable { +public final class AdlFsOutputStream extends OutputStream + implements Syncable, StreamCapabilities { private final ADLFileOutputStream out; public AdlFsOutputStream(ADLFileOutputStream out, Configuration configuration) @@ -79,4 +82,9 @@ public synchronized void hflush() throws IOException { public synchronized void hsync() throws IOException { out.flush(); } + + @Override + public boolean hasCapability(String capability) { + return StoreImplementationUtils.isProbeForSyncable(capability); + } } diff --git a/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml b/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml index 43de5bb918..4f5c99fbe0 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml +++ b/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml @@ -153,4 +153,14 @@ true + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index 8fe080dbce..5412c05440 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.UUID; import java.util.Random; import java.util.concurrent.ConcurrentLinkedDeque; @@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -551,13 +551,7 @@ public boolean hasCapability(String capability) { if (!compactionEnabled) { return false; } - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return true; - default: - return false; - } + return StoreImplementationUtils.isProbeForSyncable(capability); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 8ba2223077..48ef495d7b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.azure.security.Constants; import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager; import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -1052,10 +1053,7 @@ public void hsync() throws IOException { */ @Override // StreamCapability public boolean hasCapability(String capability) { - if (out instanceof StreamCapabilities) { - return ((StreamCapabilities) out).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(out, capability); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java index 14ddb02fc4..f8aed2612a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; /** * Support the Syncable interface on top of a DataOutputStream. @@ -56,10 +57,7 @@ public OutputStream getOutStream() { @Override public boolean hasCapability(String capability) { - if (out instanceof StreamCapabilities) { - return ((StreamCapabilities) out).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(out, capability); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 53bdfe94cf..2d02019ab1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -24,7 +24,6 @@ import java.io.OutputStream; import java.net.HttpURLConnection; import java.nio.ByteBuffer; -import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorCompletionService; @@ -54,6 +53,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; import static org.apache.hadoop.io.IOUtils.wrapException; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE; @@ -164,13 +164,7 @@ public AbfsOutputStream( */ @Override public boolean hasCapability(String capability) { - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return supportFlush; - default: - return false; - } + return supportFlush && isProbeForSyncable(capability); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java index b8edc4b7d6..835b82c3c1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java @@ -27,8 +27,6 @@ import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.CloudBlockBlob; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,8 +37,9 @@ import org.hamcrest.core.IsNot; import org.junit.Test; -import static org.junit.Assert.*; -import static org.junit.Assume.assumeNotNull; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities; /** * Test semantics of functions flush, hflush, hsync, and close for block blobs, @@ -192,11 +191,14 @@ public void testPageBlobClose() throws IOException { public void testPageBlobCapabilities() throws IOException { Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); try (FSDataOutputStream stream = fs.create(path)) { - assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertCapabilities(stream, + new String[]{ + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER}, + null); stream.write(getRandomBytes()); } } @@ -285,11 +287,12 @@ public void testBlockBlobClose() throws IOException { public void testBlockBlobCapabilities() throws IOException { Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); try (FSDataOutputStream stream = fs.create(path)) { - assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertFalse(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertLacksStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); stream.write(getRandomBytes()); } } @@ -381,11 +384,12 @@ public void testBlockBlobCompactionClose() throws IOException { public void testBlockBlobCompactionCapabilities() throws IOException { Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); try (FSDataOutputStream stream = fs.create(path)) { - assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertHasStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); stream.write(getRandomBytes()); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 92aa5520ee..d8f0dc28dd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -41,6 +41,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities; + /** * Test flush operation. * This class cannot be run in parallel test mode--check comments in @@ -306,11 +309,12 @@ public void testStreamCapabilitiesWithFlushDisabled() throws Exception { final Path testFilePath = path(methodName.getMethodName()); try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) { - assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertFalse(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertLacksStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); } } @@ -320,11 +324,12 @@ public void testStreamCapabilitiesWithFlushEnabled() throws Exception { byte[] buffer = getRandomBytesArray(); final Path testFilePath = path(methodName.getMethodName()); try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) { - assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertHasStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); } } diff --git a/hadoop-tools/hadoop-azure/src/test/resources/abfs.xml b/hadoop-tools/hadoop-azure/src/test/resources/abfs.xml index 1561da2234..f06e5cac9b 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/abfs.xml +++ b/hadoop-tools/hadoop-azure/src/test/resources/abfs.xml @@ -66,4 +66,20 @@ fs.contract.supports-unbuffer true + + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + + + fs.contract.metadata_updated_on_hsync + true + +