HADOOP-13327 Output Stream Specification. (#2587)
This defines what output streams and especially those which implement Syncable are meant to do, and documents where implementations (HDFS; S3) don't. With tests. The file:// FileSystem now supports Syncable if an application calls FileSystem.setWriteChecksum(false) before creating a file -checksumming and Syncable.hsync() are incompatible. Contributed by Steve Loughran. Change-Id: I892d768de6268f4dd6f175b3fe3b7e5bcaa91194
This commit is contained in:
parent
37971c71d1
commit
98e4d516ea
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
@ -312,10 +313,7 @@ private void freeBuffers() {
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
if (out instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) out).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
return StoreImplementationUtils.hasCapability(out, capability);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.EOFException;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.IOException;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
@ -153,4 +154,12 @@ public boolean hasCapability(final String capability) {
|
||||
public IOStatistics getIOStatistics() {
|
||||
return retrieveIOStatistics(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ",
|
||||
BufferedFSInputStream.class.getSimpleName() + "[", "]")
|
||||
.add("in=" + in)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,6 @@ public interface CanSetDropBehind {
|
||||
* UnsupportedOperationException If this stream doesn't support
|
||||
* setting the drop-behind.
|
||||
*/
|
||||
public void setDropBehind(Boolean dropCache)
|
||||
void setDropBehind(Boolean dropCache)
|
||||
throws IOException, UnsupportedOperationException;
|
||||
}
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
|
||||
|
||||
/****************************************************************
|
||||
* Abstract Checksumed FileSystem.
|
||||
@ -479,12 +480,15 @@ public IOStatistics getIOStatistics() {
|
||||
|
||||
/**
|
||||
* Probe the inner stream for a capability.
|
||||
*
|
||||
* Syncable operations are rejected before being passed down.
|
||||
* @param capability string to query the stream support for.
|
||||
* @return true if a capability is known to be supported.
|
||||
*/
|
||||
@Override
|
||||
public boolean hasCapability(final String capability) {
|
||||
if (isProbeForSyncable(capability)) {
|
||||
return false;
|
||||
}
|
||||
return datas.hasCapability(capability);
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
|
||||
@ -237,10 +238,7 @@ public void unbuffer() {
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
if (in instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) in).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
return StoreImplementationUtils.hasCapability(in, capability);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
|
||||
@ -126,10 +127,7 @@ public OutputStream getWrappedStream() {
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
if (wrappedStream instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) wrappedStream).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
return StoreImplementationUtils.hasCapability(wrappedStream, capability);
|
||||
}
|
||||
|
||||
@Override // Syncable
|
||||
|
@ -33,7 +33,8 @@
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||
@InterfaceStability.Unstable
|
||||
abstract public class FSOutputSummer extends OutputStream {
|
||||
abstract public class FSOutputSummer extends OutputStream implements
|
||||
StreamCapabilities {
|
||||
// data checksum
|
||||
private final DataChecksum sum;
|
||||
// internal buffer for storing data before it is checksumed
|
||||
@ -254,4 +255,9 @@ protected synchronized void setChecksumBufSize(int size) {
|
||||
protected synchronized void resetChecksumBufSize() {
|
||||
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -43,10 +43,12 @@
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
@ -137,8 +139,13 @@ class LocalFSFileInputStream extends FSInputStream implements
|
||||
STREAM_READ_SKIP_BYTES)
|
||||
.build();
|
||||
|
||||
/** Reference to the bytes read counter for slightly faster counting. */
|
||||
private final AtomicLong bytesRead;
|
||||
|
||||
public LocalFSFileInputStream(Path f) throws IOException {
|
||||
fis = new FileInputStream(pathToFile(f));
|
||||
bytesRead = ioStatistics.getCounterReference(
|
||||
STREAM_READ_BYTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -161,8 +168,8 @@ public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Just forward to the fis
|
||||
/**
|
||||
* Just forward to the fis.
|
||||
*/
|
||||
@Override
|
||||
public int available() throws IOException { return fis.available(); }
|
||||
@ -178,7 +185,7 @@ public int read() throws IOException {
|
||||
if (value >= 0) {
|
||||
this.position++;
|
||||
statistics.incrementBytesRead(1);
|
||||
ioStatistics.incrementCounter(STREAM_READ_BYTES);
|
||||
bytesRead.addAndGet(1);
|
||||
}
|
||||
return value;
|
||||
} catch (IOException e) { // unexpected exception
|
||||
@ -196,7 +203,7 @@ public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (value > 0) {
|
||||
this.position += value;
|
||||
statistics.incrementBytesRead(value);
|
||||
ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
|
||||
bytesRead.addAndGet(value);
|
||||
}
|
||||
return value;
|
||||
} catch (IOException e) { // unexpected exception
|
||||
@ -285,7 +292,7 @@ public FSDataInputStream open(PathHandle fd, int bufferSize)
|
||||
* For create()'s FSOutputStream.
|
||||
*********************************************************/
|
||||
final class LocalFSFileOutputStream extends OutputStream implements
|
||||
IOStatisticsSource, StreamCapabilities {
|
||||
IOStatisticsSource, StreamCapabilities, Syncable {
|
||||
private FileOutputStream fos;
|
||||
|
||||
/**
|
||||
@ -354,6 +361,21 @@ public void write(int b) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hflush() throws IOException {
|
||||
flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* HSync calls sync on fhe file descriptor after a local flush() call.
|
||||
* @throws IOException failure
|
||||
*/
|
||||
@Override
|
||||
public void hsync() throws IOException {
|
||||
flush();
|
||||
fos.getFD().sync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
// a bit inefficient, but intended to make it easier to add
|
||||
@ -362,7 +384,7 @@ public boolean hasCapability(String capability) {
|
||||
case StreamCapabilities.IOSTATISTICS:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
return StoreImplementationUtils.isProbeForSyncable(capability);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,11 @@
|
||||
public interface StreamCapabilities {
|
||||
/**
|
||||
* Stream hflush capability implemented by {@link Syncable#hflush()}.
|
||||
*
|
||||
* Use the {@link #HSYNC} probe to check for the support of Syncable;
|
||||
* it's that presence of {@code hsync()} which matters.
|
||||
*/
|
||||
@Deprecated
|
||||
String HFLUSH = "hflush";
|
||||
|
||||
/**
|
||||
|
@ -23,20 +23,24 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/** This interface for flush/sync operation. */
|
||||
/**
|
||||
* This is the interface for flush/sync operations.
|
||||
* Consult the Hadoop filesystem specification for the definition of the
|
||||
* semantics of these operations.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@InterfaceStability.Stable
|
||||
public interface Syncable {
|
||||
|
||||
|
||||
/** Flush out the data in client's user buffer. After the return of
|
||||
* this call, new readers will see the data.
|
||||
* @throws IOException if any error occurs
|
||||
*/
|
||||
public void hflush() throws IOException;
|
||||
|
||||
void hflush() throws IOException;
|
||||
|
||||
/** Similar to posix fsync, flush out the data in client's user buffer
|
||||
* all the way to the disk device (but the disk may have it in its cache).
|
||||
* @throws IOException if error occurs
|
||||
*/
|
||||
public void hsync() throws IOException;
|
||||
void hsync() throws IOException;
|
||||
}
|
||||
|
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.impl;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
|
||||
import static org.apache.hadoop.fs.StreamCapabilities.HFLUSH;
|
||||
import static org.apache.hadoop.fs.StreamCapabilities.HSYNC;
|
||||
|
||||
/**
|
||||
* Utility classes to help implementing filesystems and streams.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public final class StoreImplementationUtils {
|
||||
|
||||
private StoreImplementationUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the probe capability being for {@link StreamCapabilities#HSYNC}
|
||||
* or {@link StreamCapabilities#HFLUSH}
|
||||
* {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality.
|
||||
* @param capability capability string.
|
||||
* @return true if either refers to one of the Syncable operations.
|
||||
*/
|
||||
public static boolean isProbeForSyncable(String capability) {
|
||||
return capability.equalsIgnoreCase(HSYNC) ||
|
||||
capability.equalsIgnoreCase(HFLUSH);
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe for an object having a capability; returns true
|
||||
* if the stream implements {@link StreamCapabilities} and its
|
||||
* {@code hasCapabilities()} method returns true for the capability.
|
||||
* This is a package private method intended to provided a common
|
||||
* implementation for input and output streams.
|
||||
* {@link StreamCapabilities#hasCapability(String)} call is for public use.
|
||||
* @param object object to probe.
|
||||
* @param capability capability to probe for
|
||||
* @return true if the object implements stream capabilities and
|
||||
* declares that it supports the capability.
|
||||
*/
|
||||
static boolean objectHasCapability(Object object, String capability) {
|
||||
if (object instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) object).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe for an output stream having a capability; returns true
|
||||
* if the stream implements {@link StreamCapabilities} and its
|
||||
* {@code hasCapabilities()} method returns true for the capability.
|
||||
* @param out output stream
|
||||
* @param capability capability to probe for
|
||||
* @return true if the stream declares that it supports the capability.
|
||||
*/
|
||||
public static boolean hasCapability(OutputStream out, String capability) {
|
||||
return objectHasCapability(out, capability);
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe for an input stream having a capability; returns true
|
||||
* if the stream implements {@link StreamCapabilities} and its
|
||||
* {@code hasCapabilities()} method returns true for the capability.
|
||||
* @param in input stream
|
||||
* @param capability capability to probe for
|
||||
* @return true if the stream declares that it supports the capability.
|
||||
*/
|
||||
public static boolean hasCapability(InputStream in, String capability) {
|
||||
return objectHasCapability(in, capability);
|
||||
}
|
||||
|
||||
}
|
@ -664,11 +664,15 @@ For instance, HDFS may raise an `InvalidPathException`.
|
||||
|
||||
result = FSDataOutputStream
|
||||
|
||||
The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`.
|
||||
A zero byte file MUST exist at the end of the specified path, visible to all.
|
||||
|
||||
The updated (valid) FileSystem MUST contain all the parent directories of the path, as created by `mkdirs(parent(p))`.
|
||||
|
||||
The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of
|
||||
`FS.Files[p]`
|
||||
|
||||
The behavior of the returned stream is covered in [Output](outputstream.html).
|
||||
|
||||
#### Implementation Notes
|
||||
|
||||
* Some implementations split the create into a check for the file existing
|
||||
@ -677,10 +681,18 @@ The result is `FSDataOutputStream`, which through its operations may generate ne
|
||||
clients creating files with `overwrite==true` to fail if the file is created
|
||||
by another client between the two tests.
|
||||
|
||||
* S3A, Swift and potentially other Object Stores do not currently change the FS state
|
||||
* S3A, Swift and potentially other Object Stores do not currently change the `FS` state
|
||||
until the output stream `close()` operation is completed.
|
||||
This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`,
|
||||
and potentially confuse file/directory logic
|
||||
This is a significant difference between the behavior of object stores
|
||||
and that of filesystems, as it allows >1 client to create a file with `overwrite=false`,
|
||||
and potentially confuse file/directory logic. In particular, using `create()` to acquire
|
||||
an exclusive lock on a file (whoever creates the file without an error is considered
|
||||
the holder of the lock) may not not a safe algorithm to use when working with object stores.
|
||||
|
||||
* Object stores may create an empty file as a marker when a file is created.
|
||||
However, object stores with `overwrite=true` semantics may not implement this atomically,
|
||||
so creating files with `overwrite=false` cannot be used as an implicit exclusion
|
||||
mechanism between processes.
|
||||
|
||||
* The Local FileSystem raises a `FileNotFoundException` when trying to create a file over
|
||||
a directory, hence it is listed as an exception that MAY be raised when
|
||||
@ -692,6 +704,8 @@ this precondition fails.
|
||||
|
||||
Make a `FSDataOutputStreamBuilder` to specify the parameters to create a file.
|
||||
|
||||
The behavior of the returned stream is covered in [Output](outputstream.html).
|
||||
|
||||
#### Implementation Notes
|
||||
|
||||
`createFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make
|
||||
@ -717,17 +731,21 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep
|
||||
|
||||
#### Postconditions
|
||||
|
||||
FS
|
||||
FS' = FS
|
||||
result = FSDataOutputStream
|
||||
|
||||
Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]`
|
||||
by appending data to the existing list.
|
||||
|
||||
The behavior of the returned stream is covered in [Output](outputstream.html).
|
||||
|
||||
### `FSDataOutputStreamBuilder appendFile(Path p)`
|
||||
|
||||
Make a `FSDataOutputStreamBuilder` to specify the parameters to append to an
|
||||
existing file.
|
||||
|
||||
The behavior of the returned stream is covered in [Output](outputstream.html).
|
||||
|
||||
#### Implementation Notes
|
||||
|
||||
`appendFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make
|
||||
|
@ -32,6 +32,7 @@ HDFS as these are commonly expected by Hadoop client applications.
|
||||
1. [Notation](notation.html)
|
||||
1. [Model](model.html)
|
||||
1. [FileSystem class](filesystem.html)
|
||||
1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
|
||||
1. [FSDataInputStream class](fsdatainputstream.html)
|
||||
1. [PathCapabilities interface](pathcapabilities.html)
|
||||
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -18,23 +18,31 @@
|
||||
|
||||
package org.apache.hadoop.fs.contract;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.AssumptionViolatedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
|
||||
|
||||
/**
|
||||
* Test creating files, overwrite options etc.
|
||||
@ -42,6 +50,9 @@
|
||||
public abstract class AbstractContractCreateTest extends
|
||||
AbstractFSContractTestBase {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractContractCreateTest.class);
|
||||
|
||||
/**
|
||||
* How long to wait for a path to become visible.
|
||||
*/
|
||||
@ -436,4 +447,145 @@ private void createFile(Path path) throws IOException {
|
||||
writeDataset(fs, path, data, data.length, 1024 * 1024,
|
||||
true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncable() throws Throwable {
|
||||
describe("test declared and actual Syncable behaviors");
|
||||
FileSystem fs = getFileSystem();
|
||||
boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
|
||||
boolean supportsSync = isSupported(SUPPORTS_HSYNC);
|
||||
boolean metadataUpdatedOnHSync = isSupported(METADATA_UPDATED_ON_HSYNC);
|
||||
|
||||
validateSyncableSemantics(fs,
|
||||
supportsSync,
|
||||
supportsFlush,
|
||||
metadataUpdatedOnHSync);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the semantics of syncable.
|
||||
* @param fs filesystem
|
||||
* @param supportsSync sync is present
|
||||
* @param supportsFlush flush is present.
|
||||
* @param metadataUpdatedOnHSync Is the metadata updated after an hsync?
|
||||
* @throws IOException failure
|
||||
*/
|
||||
protected void validateSyncableSemantics(final FileSystem fs,
|
||||
final boolean supportsSync,
|
||||
final boolean supportsFlush,
|
||||
final boolean metadataUpdatedOnHSync)
|
||||
throws IOException {
|
||||
Path path = methodPath();
|
||||
LOG.info("Expecting files under {} to have supportsSync={}"
|
||||
+ " and supportsFlush={}; metadataUpdatedOnHSync={}",
|
||||
path, supportsSync, supportsFlush, metadataUpdatedOnHSync);
|
||||
|
||||
try (FSDataOutputStream out = fs.create(path, true)) {
|
||||
LOG.info("Created output stream {}", out);
|
||||
|
||||
// probe stream for support for flush/sync, whose capabilities
|
||||
// of supports/does not support must match what is expected
|
||||
String[] hflushCapabilities = {
|
||||
StreamCapabilities.HFLUSH
|
||||
};
|
||||
String[] hsyncCapabilities = {
|
||||
StreamCapabilities.HSYNC
|
||||
};
|
||||
if (supportsFlush) {
|
||||
assertCapabilities(out, hflushCapabilities, null);
|
||||
} else {
|
||||
assertCapabilities(out, null, hflushCapabilities);
|
||||
}
|
||||
if (supportsSync) {
|
||||
assertCapabilities(out, hsyncCapabilities, null);
|
||||
} else {
|
||||
assertCapabilities(out, null, hsyncCapabilities);
|
||||
}
|
||||
|
||||
// write one byte, then hflush it
|
||||
out.write('a');
|
||||
try {
|
||||
out.hflush();
|
||||
if (!supportsFlush) {
|
||||
// FSDataOutputStream silently downgrades to flush() here.
|
||||
// This is not good, but if changed some applications
|
||||
// break writing to some stores.
|
||||
LOG.warn("FS doesn't support Syncable.hflush(),"
|
||||
+ " but doesn't reject it either.");
|
||||
}
|
||||
} catch (UnsupportedOperationException e) {
|
||||
if (supportsFlush) {
|
||||
throw new AssertionError("hflush not supported", e);
|
||||
}
|
||||
}
|
||||
|
||||
// write a second byte, then hsync it.
|
||||
out.write('b');
|
||||
try {
|
||||
out.hsync();
|
||||
} catch (UnsupportedOperationException e) {
|
||||
if (supportsSync) {
|
||||
throw new AssertionError("HSync not supported", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (supportsSync) {
|
||||
// if sync really worked, data MUST be visible here
|
||||
|
||||
// first the metadata which MUST be present
|
||||
final FileStatus st = fs.getFileStatus(path);
|
||||
if (metadataUpdatedOnHSync) {
|
||||
// not all stores reliably update it, HDFS/webHDFS in particular
|
||||
assertEquals("Metadata not updated during write " + st,
|
||||
2, st.getLen());
|
||||
}
|
||||
|
||||
// there's no way to verify durability, but we can
|
||||
// at least verify a new file input stream reads
|
||||
// the data
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
assertEquals('a', in.read());
|
||||
assertEquals('b', in.read());
|
||||
assertEquals(-1, in.read());
|
||||
LOG.info("Successfully read synced data on a new reader {}", in);
|
||||
}
|
||||
} else {
|
||||
// no sync. Let's do a flush and see what happens.
|
||||
out.flush();
|
||||
// Now look at the filesystem.
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
int c = in.read();
|
||||
if (c == -1) {
|
||||
// nothing was synced; sync and flush really aren't there.
|
||||
LOG.info("sync and flush are declared unsupported"
|
||||
+ " -flushed changes were not saved");
|
||||
|
||||
} else {
|
||||
LOG.info("sync and flush are declared unsupported"
|
||||
+ " - but the stream does offer some sync/flush semantics");
|
||||
}
|
||||
// close outside a finally as we do want to see any exception raised.
|
||||
in.close();
|
||||
|
||||
} catch (FileNotFoundException e) {
|
||||
// that's OK if it's an object store, but not if its a real
|
||||
// FS
|
||||
if (!isSupported(IS_BLOBSTORE)) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.warn(
|
||||
"Output file was not created; this is an object store with different"
|
||||
+ " visibility semantics");
|
||||
}
|
||||
}
|
||||
}
|
||||
// close the output stream
|
||||
out.close();
|
||||
|
||||
final String stats = ioStatisticsSourceToString(out);
|
||||
if (!stats.isEmpty()) {
|
||||
LOG.info("IOStatistics {}", stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -241,4 +241,19 @@ public interface ContractOptions {
|
||||
*/
|
||||
String TEST_RANDOM_SEEK_COUNT = "test.random-seek-count";
|
||||
|
||||
/**
|
||||
* Is hflush supported in API and StreamCapabilities?
|
||||
*/
|
||||
String SUPPORTS_HFLUSH = "supports-hflush";
|
||||
|
||||
/**
|
||||
* Is hsync supported in API and StreamCapabilities?
|
||||
*/
|
||||
String SUPPORTS_HSYNC = "supports-hsync";
|
||||
|
||||
/**
|
||||
* Is the metadata updated after an hsync?
|
||||
* HDFS does not do this.
|
||||
*/
|
||||
String METADATA_UPDATED_ON_HSYNC = "metadata_updated_on_hsync";
|
||||
}
|
||||
|
@ -1542,19 +1542,49 @@ public static void assertCapabilities(
|
||||
StreamCapabilities source = (StreamCapabilities) stream;
|
||||
if (shouldHaveCapabilities != null) {
|
||||
for (String shouldHaveCapability : shouldHaveCapabilities) {
|
||||
assertTrue("Should have capability: " + shouldHaveCapability,
|
||||
assertTrue("Should have capability: " + shouldHaveCapability
|
||||
+ " in " + source,
|
||||
source.hasCapability(shouldHaveCapability));
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldNotHaveCapabilities != null) {
|
||||
for (String shouldNotHaveCapability : shouldNotHaveCapabilities) {
|
||||
assertFalse("Should not have capability: " + shouldNotHaveCapability,
|
||||
assertFalse("Should not have capability: " + shouldNotHaveCapability
|
||||
+ " in " + source,
|
||||
source.hasCapability(shouldNotHaveCapability));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Custom assert to verify capabilities supported by
|
||||
* an object through {@link StreamCapabilities}.
|
||||
*
|
||||
* @param source The object to test for StreamCapabilities
|
||||
* @param capabilities The list of expected capabilities
|
||||
*/
|
||||
public static void assertHasStreamCapabilities(
|
||||
final Object source,
|
||||
final String... capabilities) {
|
||||
assertCapabilities(source, capabilities, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom assert to verify capabilities NOT supported by
|
||||
* an object through {@link StreamCapabilities}.
|
||||
*
|
||||
* @param source The object to test for StreamCapabilities
|
||||
* @param capabilities The list of capabilities which must not be
|
||||
* supported.
|
||||
*/
|
||||
public static void assertLacksStreamCapabilities(
|
||||
final Object source,
|
||||
final String... capabilities) {
|
||||
assertCapabilities(source, null, capabilities);
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom assert to test {@link PathCapabilities}.
|
||||
*
|
||||
@ -1569,7 +1599,8 @@ public static void assertHasPathCapabilities(
|
||||
|
||||
for (String shouldHaveCapability: capabilities) {
|
||||
assertTrue("Should have capability: " + shouldHaveCapability
|
||||
+ " under " + path,
|
||||
+ " under " + path
|
||||
+ " in " + source,
|
||||
source.hasPathCapability(path, shouldHaveCapability));
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,10 @@
|
||||
|
||||
package org.apache.hadoop.fs.contract.localfs;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
@ -29,4 +32,17 @@ protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new LocalFSContract(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncablePassthroughIfChecksumDisabled() throws Throwable {
|
||||
describe("Create an instance of the local fs, disable the checksum"
|
||||
+ " and verify that Syncable now works");
|
||||
LocalFileSystem fs = (LocalFileSystem) getFileSystem();
|
||||
try (LocalFileSystem lfs = new LocalFileSystem(
|
||||
fs.getRawFileSystem())) {
|
||||
// disable checksumming output
|
||||
lfs.setWriteChecksum(false);
|
||||
// now the filesystem supports Sync with immediate update of file status
|
||||
validateSyncableSemantics(lfs, true, true, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,4 +121,14 @@ case sensitivity and permission options are determined at run time from OS type
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-settimes</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-getfilestatus</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -127,4 +127,19 @@
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hflush</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hsync</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.metadata_updated_on_hsync</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
@ -66,7 +67,6 @@
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DataChecksum.Type;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.slf4j.Logger;
|
||||
@ -560,13 +560,7 @@ void endBlock() throws IOException {
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
switch (StringUtils.toLowerCase(capability)) {
|
||||
case StreamCapabilities.HSYNC:
|
||||
case StreamCapabilities.HFLUSH:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return StoreImplementationUtils.isProbeForSyncable(capability);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,4 +116,19 @@
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hflush</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hsync</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.metadata_updated_on_hsync</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -37,6 +37,8 @@
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
|
||||
@ -70,15 +72,20 @@
|
||||
* is instead done as a single PUT operation.
|
||||
*
|
||||
* Unstable: statistics and error handling might evolve.
|
||||
*
|
||||
* Syncable is declared as supported so the calls can be
|
||||
* explicitly rejected.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
class S3ABlockOutputStream extends OutputStream implements
|
||||
StreamCapabilities, IOStatisticsSource {
|
||||
StreamCapabilities, IOStatisticsSource, Syncable {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(S3ABlockOutputStream.class);
|
||||
|
||||
private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable";
|
||||
|
||||
/** Owner FileSystem. */
|
||||
private final S3AFileSystem fs;
|
||||
|
||||
@ -546,6 +553,16 @@ public boolean hasCapability(String capability) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hflush() throws IOException {
|
||||
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hsync() throws IOException {
|
||||
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOStatistics getIOStatistics() {
|
||||
return iostatistics;
|
||||
|
@ -116,5 +116,5 @@ private InternalConstants() {
|
||||
* problems related to region/endpoint setup, it is currently
|
||||
* disabled.
|
||||
*/
|
||||
public static final boolean AWS_SDK_METRICS_ENABLED = false;
|
||||
public static final boolean AWS_SDK_METRICS_ENABLED = true;
|
||||
}
|
||||
|
@ -22,6 +22,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -42,7 +44,8 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public final class AdlFsOutputStream extends OutputStream implements Syncable {
|
||||
public final class AdlFsOutputStream extends OutputStream
|
||||
implements Syncable, StreamCapabilities {
|
||||
private final ADLFileOutputStream out;
|
||||
|
||||
public AdlFsOutputStream(ADLFileOutputStream out, Configuration configuration)
|
||||
@ -79,4 +82,9 @@ public synchronized void hflush() throws IOException {
|
||||
public synchronized void hsync() throws IOException {
|
||||
out.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
return StoreImplementationUtils.isProbeForSyncable(capability);
|
||||
}
|
||||
}
|
||||
|
@ -153,4 +153,14 @@
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hflush</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hsync</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -27,7 +27,6 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.UUID;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
@ -42,6 +41,7 @@
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -551,13 +551,7 @@ public boolean hasCapability(String capability) {
|
||||
if (!compactionEnabled) {
|
||||
return false;
|
||||
}
|
||||
switch (capability.toLowerCase(Locale.ENGLISH)) {
|
||||
case StreamCapabilities.HSYNC:
|
||||
case StreamCapabilities.HFLUSH:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return StoreImplementationUtils.isProbeForSyncable(capability);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,6 +70,7 @@
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
@ -1052,10 +1053,7 @@ public void hsync() throws IOException {
|
||||
*/
|
||||
@Override // StreamCapability
|
||||
public boolean hasCapability(String capability) {
|
||||
if (out instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) out).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
return StoreImplementationUtils.hasCapability(out, capability);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
|
||||
|
||||
/**
|
||||
* Support the Syncable interface on top of a DataOutputStream.
|
||||
@ -56,10 +57,7 @@ public OutputStream getOutStream() {
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
if (out instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) out).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
return StoreImplementationUtils.hasCapability(out, capability);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,7 +24,6 @@
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
@ -54,6 +53,7 @@
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
|
||||
import static org.apache.hadoop.io.IOUtils.wrapException;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
|
||||
@ -164,13 +164,7 @@ public AbfsOutputStream(
|
||||
*/
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
switch (capability.toLowerCase(Locale.ENGLISH)) {
|
||||
case StreamCapabilities.HSYNC:
|
||||
case StreamCapabilities.HFLUSH:
|
||||
return supportFlush;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return supportFlush && isProbeForSyncable(capability);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -27,8 +27,6 @@
|
||||
import com.microsoft.azure.storage.blob.BlockListingFilter;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
@ -39,8 +37,9 @@
|
||||
import org.hamcrest.core.IsNot;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assume.assumeNotNull;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities;
|
||||
|
||||
/**
|
||||
* Test semantics of functions flush, hflush, hsync, and close for block blobs,
|
||||
@ -192,11 +191,14 @@ public void testPageBlobClose() throws IOException {
|
||||
public void testPageBlobCapabilities() throws IOException {
|
||||
Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
|
||||
try (FSDataOutputStream stream = fs.create(path)) {
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||
assertCapabilities(stream,
|
||||
new String[]{
|
||||
StreamCapabilities.HFLUSH,
|
||||
StreamCapabilities.HSYNC,
|
||||
StreamCapabilities.DROPBEHIND,
|
||||
StreamCapabilities.READAHEAD,
|
||||
StreamCapabilities.UNBUFFER},
|
||||
null);
|
||||
stream.write(getRandomBytes());
|
||||
}
|
||||
}
|
||||
@ -285,11 +287,12 @@ public void testBlockBlobClose() throws IOException {
|
||||
public void testBlockBlobCapabilities() throws IOException {
|
||||
Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
|
||||
try (FSDataOutputStream stream = fs.create(path)) {
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||
assertLacksStreamCapabilities(stream,
|
||||
StreamCapabilities.HFLUSH,
|
||||
StreamCapabilities.HSYNC,
|
||||
StreamCapabilities.DROPBEHIND,
|
||||
StreamCapabilities.READAHEAD,
|
||||
StreamCapabilities.UNBUFFER);
|
||||
stream.write(getRandomBytes());
|
||||
}
|
||||
}
|
||||
@ -381,11 +384,12 @@ public void testBlockBlobCompactionClose() throws IOException {
|
||||
public void testBlockBlobCompactionCapabilities() throws IOException {
|
||||
Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
|
||||
try (FSDataOutputStream stream = fs.create(path)) {
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||
assertHasStreamCapabilities(stream,
|
||||
StreamCapabilities.HFLUSH,
|
||||
StreamCapabilities.HSYNC,
|
||||
StreamCapabilities.DROPBEHIND,
|
||||
StreamCapabilities.READAHEAD,
|
||||
StreamCapabilities.UNBUFFER);
|
||||
stream.write(getRandomBytes());
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,9 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities;
|
||||
|
||||
/**
|
||||
* Test flush operation.
|
||||
* This class cannot be run in parallel test mode--check comments in
|
||||
@ -306,11 +309,12 @@ public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
|
||||
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||
assertLacksStreamCapabilities(stream,
|
||||
StreamCapabilities.HFLUSH,
|
||||
StreamCapabilities.HSYNC,
|
||||
StreamCapabilities.DROPBEHIND,
|
||||
StreamCapabilities.READAHEAD,
|
||||
StreamCapabilities.UNBUFFER);
|
||||
}
|
||||
}
|
||||
|
||||
@ -320,11 +324,12 @@ public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
|
||||
byte[] buffer = getRandomBytesArray();
|
||||
final Path testFilePath = path(methodName.getMethodName());
|
||||
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||
assertHasStreamCapabilities(stream,
|
||||
StreamCapabilities.HFLUSH,
|
||||
StreamCapabilities.HSYNC,
|
||||
StreamCapabilities.DROPBEHIND,
|
||||
StreamCapabilities.READAHEAD,
|
||||
StreamCapabilities.UNBUFFER);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,4 +66,20 @@
|
||||
<name>fs.contract.supports-unbuffer</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hflush</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-hsync</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.metadata_updated_on_hsync</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
Loading…
Reference in New Issue
Block a user