diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 3db3173922..59d95cf49a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -24,15 +24,22 @@ import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.Progressable; /**************************************************************** @@ -484,6 +491,32 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, blockSize, progress); } + @Override + public FSDataOutputStream create(final Path f, + final FsPermission permission, + final EnumSet flags, + final int bufferSize, + final short replication, + final long blockSize, + final Progressable progress, + final Options.ChecksumOpt checksumOpt) throws IOException { + return create(f, permission, flags.contains(CreateFlag.OVERWRITE), + bufferSize, replication, blockSize, progress); + } + + @Override + public FSDataOutputStream createNonRecursive(final Path f, + final FsPermission permission, + final EnumSet flags, + final int bufferSize, + final short replication, + final long blockSize, + final Progressable progress) throws IOException { + return create(f, permission, flags.contains(CreateFlag.OVERWRITE), + false, bufferSize, replication, + blockSize, progress); + } + abstract class FsOperation { boolean run(Path p) throws IOException { boolean status = apply(p); @@ -780,4 +813,57 @@ public boolean reportChecksumFailure(Path f, FSDataInputStream in, long inPos, FSDataInputStream sums, long sumsPos) { return false; } + + /** + * This is overridden to ensure that this class's + * {@link #openFileWithOptions}() method is called, and so ultimately + * its {@link #open(Path, int)}. + * + * {@inheritDoc} + */ + @Override + public FutureDataInputStreamBuilder openFile(final Path path) + throws IOException, UnsupportedOperationException { + return ((FutureDataInputStreamBuilderImpl) + createDataInputStreamBuilder(this, path)).getThisBuilder(); + } + + /** + * Open the file as a blocking call to {@link #open(Path, int)}. + * + * {@inheritDoc} + */ + @Override + protected CompletableFuture openFileWithOptions( + final Path path, + final Set mandatoryKeys, + final Configuration options, + final int bufferSize) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + Collections.emptySet(), + "for " + path); + return LambdaUtils.eval( + new CompletableFuture<>(), () -> open(path, bufferSize)); + } + + /** + * This is overridden to ensure that this class's create() method is + * ultimately called. + * + * {@inheritDoc} + */ + public FSDataOutputStreamBuilder createFile(Path path) { + return createDataOutputStreamBuilder(this, path) + .create().overwrite(true); + } + + /** + * This is overridden to ensure that this class's create() method is + * ultimately called. + * + * {@inheritDoc} + */ + public FSDataOutputStreamBuilder appendFile(Path path) { + return createDataOutputStreamBuilder(this, path).append(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 7e144e0f83..efb675cf6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4240,14 +4240,34 @@ public static GlobalStorageStatistics getGlobalStorageStatistics() { return GlobalStorageStatistics.INSTANCE; } + /** + * Create instance of the standard FSDataOutputStreamBuilder for the + * given filesystem and path. + * @param fileSystem owner + * @param path path to create + * @return a builder. + */ + @InterfaceStability.Unstable + protected static FSDataOutputStreamBuilder createDataOutputStreamBuilder( + @Nonnull final FileSystem fileSystem, + @Nonnull final Path path) { + return new FileSystemDataOutputStreamBuilder(fileSystem, path); + } + + /** + * Standard implementation of the FSDataOutputStreamBuilder; invokes + * create/createNonRecursive or Append depending upon the options. + */ private static final class FileSystemDataOutputStreamBuilder extends FSDataOutputStreamBuilder { /** * Constructor. + * @param fileSystem owner + * @param p path to create */ - protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) { + private FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) { super(fileSystem, p); } @@ -4290,7 +4310,7 @@ public FileSystemDataOutputStreamBuilder getThisBuilder() { * builder interface becomes stable. */ public FSDataOutputStreamBuilder createFile(Path path) { - return new FileSystemDataOutputStreamBuilder(this, path) + return createDataOutputStreamBuilder(this, path) .create().overwrite(true); } @@ -4300,7 +4320,7 @@ public FSDataOutputStreamBuilder createFile(Path path) { * @return a {@link FSDataOutputStreamBuilder} to build file append request. */ public FSDataOutputStreamBuilder appendFile(Path path) { - return new FileSystemDataOutputStreamBuilder(this, path).append(); + return createDataOutputStreamBuilder(this, path).append(); } /** @@ -4321,7 +4341,7 @@ public FSDataOutputStreamBuilder appendFile(Path path) { @InterfaceStability.Unstable public FutureDataInputStreamBuilder openFile(Path path) throws IOException, UnsupportedOperationException { - return new FSDataInputStreamBuilder(this, path).getThisBuilder(); + return createDataInputStreamBuilder(this, path).getThisBuilder(); } /** @@ -4340,7 +4360,7 @@ public FutureDataInputStreamBuilder openFile(Path path) @InterfaceStability.Unstable public FutureDataInputStreamBuilder openFile(PathHandle pathHandle) throws IOException, UnsupportedOperationException { - return new FSDataInputStreamBuilder(this, pathHandle) + return createDataInputStreamBuilder(this, pathHandle) .getThisBuilder(); } @@ -4416,6 +4436,36 @@ protected CompletableFuture openFileWithOptions( return result; } + /** + * Create instance of the standard {@link FSDataInputStreamBuilder} for the + * given filesystem and path. + * @param fileSystem owner + * @param path path to read + * @return a builder. + */ + @InterfaceAudience.LimitedPrivate("Filesystems") + @InterfaceStability.Unstable + protected static FSDataInputStreamBuilder createDataInputStreamBuilder( + @Nonnull final FileSystem fileSystem, + @Nonnull final Path path) { + return new FSDataInputStreamBuilder(fileSystem, path); + } + + /** + * Create instance of the standard {@link FSDataInputStreamBuilder} for the + * given filesystem and path handle. + * @param fileSystem owner + * @param pathHandle path handle of file to open. + * @return a builder. + */ + @InterfaceAudience.LimitedPrivate("Filesystems") + @InterfaceStability.Unstable + protected static FSDataInputStreamBuilder createDataInputStreamBuilder( + @Nonnull final FileSystem fileSystem, + @Nonnull final PathHandle pathHandle) { + return new FSDataInputStreamBuilder(fileSystem, pathHandle); + } + /** * Builder returned for {@code #openFile(Path)} * and {@code #openFile(PathHandle)}. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java index b195be3297..cf0b84ca36 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java @@ -32,12 +32,14 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.QuotaUsage; @@ -464,4 +466,14 @@ public void unsetStoragePolicy(Path src) throws IOException { super.unsetStoragePolicy(fullPath(src)); } + @Override + public FSDataOutputStreamBuilder createFile(final Path path) { + return super.createFile(fullPath(path)); + } + + @Override + public FutureDataInputStreamBuilder openFile(final Path path) + throws IOException, UnsupportedOperationException { + return super.openFile(fullPath(path)); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java index fae3db83cf..bffcfa76f2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java @@ -35,10 +35,12 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows; @@ -774,4 +776,229 @@ public void testFSOutputStreamBuilderOptions() throws Exception { "unsupported key found", builder::build ); } + + private static final int CRC_SIZE = 12; + + private static final byte[] DATA = "1234567890".getBytes(); + + /** + * Get the statistics for the file schema. Contains assertions + * @return the statistics on all file:// IO. + */ + protected Statistics getFileStatistics() { + final List all = FileSystem.getAllStatistics(); + final List fileStats = all + .stream() + .filter(s -> s.getScheme().equals("file")) + .collect(Collectors.toList()); + assertEquals("Number of statistics counters for file://", + 1, fileStats.size()); + // this should be used for local and rawLocal, as they share the + // same schema (although their class is different) + return fileStats.get(0); + } + + /** + * Write the byte array {@link #DATA} to the given output stream. + * @param s stream to write to. + * @throws IOException failure to write/close the file + */ + private void writeData(FSDataOutputStream s) throws IOException { + s.write(DATA); + s.close(); + } + + /** + * Evaluate the closure while counting bytes written during + * its execution, and verify that the count included the CRC + * write as well as the data. + * After the operation, the file is deleted. + * @param operation operation for assertion method. + * @param path path to write + * @param callable expression evaluated + * @param delete should the file be deleted after? + */ + private void assertWritesCRC(String operation, Path path, + LambdaTestUtils.VoidCallable callable, boolean delete) throws Exception { + final Statistics stats = getFileStatistics(); + final long bytesOut0 = stats.getBytesWritten(); + try { + callable.call(); + assertEquals("Bytes written in " + operation + "; stats=" + stats, + CRC_SIZE + DATA.length, stats.getBytesWritten() - bytesOut0); + } finally { + if (delete) { + // clean up + try { + fileSys.delete(path, false); + } catch (IOException ignored) { + // ignore this cleanup failure + } + } + } + } + + /** + * Verify that File IO through the classic non-builder APIs generate + * statistics which imply that CRCs were read and written. + */ + @Test + public void testCRCwithClassicAPIs() throws Throwable { + final Path file = new Path(TEST_ROOT_DIR, "testByteCountersClassicAPIs"); + assertWritesCRC("create()", + file, + () -> writeData(fileSys.create(file, true)), + false); + + final Statistics stats = getFileStatistics(); + final long bytesRead0 = stats.getBytesRead(); + fileSys.open(file).close(); + final long bytesRead1 = stats.getBytesRead(); + assertEquals("Bytes read in open() call with stats " + stats, + CRC_SIZE, bytesRead1 - bytesRead0); + } + + /** + * create/7 to use write the CRC. + */ + @Test + public void testCRCwithCreate7() throws Throwable { + final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreate7"); + assertWritesCRC("create/7", + file, + () -> writeData( + fileSys.create(file, + FsPermission.getFileDefault(), + true, + 8192, + (short)1, + 16384, + null)), + true); + } + + /** + * Create with ChecksumOpt to create checksums. + * If the LocalFS ever interpreted the flag, this test may fail. + */ + @Test + public void testCRCwithCreateChecksumOpt() throws Throwable { + final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreateChecksumOpt"); + assertWritesCRC("create with checksum opt", + file, + () -> writeData( + fileSys.create(file, + FsPermission.getFileDefault(), + EnumSet.of(CreateFlag.CREATE), + 8192, + (short)1, + 16384, + null, + Options.ChecksumOpt.createDisabled())), + true); + } + + /** + * Create createNonRecursive/6. + */ + @Test + public void testCRCwithCreateNonRecursive6() throws Throwable { + fileSys.mkdirs(TEST_PATH); + final Path file = new Path(TEST_ROOT_DIR, + "testCRCwithCreateNonRecursive6"); + assertWritesCRC("create with checksum opt", + file, + () -> writeData( + fileSys.createNonRecursive(file, + FsPermission.getFileDefault(), + true, + 8192, + (short)1, + 16384, + null)), + true); + } + + /** + * Create createNonRecursive with CreateFlags. + */ + @Test + public void testCRCwithCreateNonRecursiveCreateFlags() throws Throwable { + fileSys.mkdirs(TEST_PATH); + final Path file = new Path(TEST_ROOT_DIR, + "testCRCwithCreateNonRecursiveCreateFlags"); + assertWritesCRC("create with checksum opt", + file, + () -> writeData( + fileSys.createNonRecursive(file, + FsPermission.getFileDefault(), + EnumSet.of(CreateFlag.CREATE), + 8192, + (short)1, + 16384, + null)), + true); + } + + + /** + * This relates to MAPREDUCE-7184, where the openFile() call's + * CRC count wasn't making into the statistics for the current thread. + * If the evaluation was in a separate thread you'd expect that, + * but if the completable future is in fact being synchronously completed + * it should not happen. + */ + @Test + public void testReadIncludesCRCwithBuilders() throws Throwable { + + final Path file = new Path(TEST_ROOT_DIR, + "testReadIncludesCRCwithBuilders"); + Statistics stats = getFileStatistics(); + // write the file using the builder API + assertWritesCRC("createFile()", + file, + () -> writeData( + fileSys.createFile(file) + .overwrite(true).recursive() + .build()), + false); + + // now read back the data, again with the builder API + final long bytesRead0 = stats.getBytesRead(); + fileSys.openFile(file).build().get().close(); + assertEquals("Bytes read in openFile() call with stats " + stats, + CRC_SIZE, stats.getBytesRead() - bytesRead0); + // now write with overwrite = true + assertWritesCRC("createFileNonRecursive()", + file, + () -> { + try (FSDataOutputStream s = fileSys.createFile(file) + .overwrite(true) + .build()) { + s.write(DATA); + } + }, + true); + } + + /** + * Write with the builder, using the normal recursive create + * with create flags containing the overwrite option. + */ + @Test + public void testWriteWithBuildersRecursive() throws Throwable { + + final Path file = new Path(TEST_ROOT_DIR, + "testWriteWithBuildersRecursive"); + Statistics stats = getFileStatistics(); + // write the file using the builder API + assertWritesCRC("createFile()", + file, + () -> writeData( + fileSys.createFile(file) + .overwrite(false) + .recursive() + .build()), + true); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestFutureIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestFutureIO.java new file mode 100644 index 0000000000..2e1270a1b8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestFutureIO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.test.HadoopTestBase; +import org.apache.hadoop.util.LambdaUtils; + +/** + * Test behavior of {@link FutureIOSupport}, especially "what thread do things + * happen in?". + */ +public class TestFutureIO extends HadoopTestBase { + + private ThreadLocal local; + + @Before + public void setup() throws Exception { + local = ThreadLocal.withInitial(() -> new AtomicInteger(1)); + } + + /** + * Simple eval is blocking and executes in the same thread. + */ + @Test + public void testEvalInCurrentThread() throws Throwable { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture eval = LambdaUtils.eval(result, + () -> { + return getLocal().addAndGet(2); + }); + assertEquals("Thread local value", 3, getLocalValue()); + assertEquals("Evaluated Value", 3, eval.get().intValue()); + } + + /** + * A supply async call runs things in a shared thread pool. + */ + @Test + public void testEvalAsync() throws Throwable { + final CompletableFuture eval = CompletableFuture.supplyAsync( + () -> getLocal().addAndGet(2)); + assertEquals("Thread local value", 1, getLocalValue()); + assertEquals("Evaluated Value", 3, eval.get().intValue()); + } + + + protected AtomicInteger getLocal() { + return local.get(); + } + + protected int getLocalValue() { + return local.get().get(); + } +}