diff --git a/dev-support/Jenkinsfile b/dev-support/Jenkinsfile index 5b6fcb68f8..9edd77b58e 100644 --- a/dev-support/Jenkinsfile +++ b/dev-support/Jenkinsfile @@ -23,7 +23,7 @@ pipeline { options { buildDiscarder(logRotator(numToKeepStr: '5')) - timeout (time: 24, unit: 'HOURS') + timeout (time: 48, unit: 'HOURS') timestamps() checkoutToSubdirectory('src') } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java index 59345f5d25..7f3171235c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,6 +22,9 @@ import java.io.FileDescriptor; import java.io.IOException; import java.util.StringJoiner; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -158,8 +161,24 @@ public IOStatistics getIOStatistics() { @Override public String toString() { return new StringJoiner(", ", - BufferedFSInputStream.class.getSimpleName() + "[", "]") - .add("in=" + in) - .toString(); + BufferedFSInputStream.class.getSimpleName() + "[", "]") + .add("in=" + in) + .toString(); + } + + @Override + public int minSeekForVectorReads() { + return ((PositionedReadable) in).minSeekForVectorReads(); + } + + @Override + public int maxReadSizeForVectorReads() { + return ((PositionedReadable) in).maxReadSizeForVectorReads(); + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + ((PositionedReadable) in).readVectored(ranges, allocate); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 2ae5eabd49..e612713cfe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -22,17 +22,25 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.IntFunction; +import java.util.zip.CRC32; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.VectoredReadUtils; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; @@ -66,7 +74,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { public static double getApproxChkSumLength(long size) { return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; } - + public ChecksumFileSystem(FileSystem fs) { super(fs); } @@ -82,7 +90,7 @@ public void setConf(Configuration conf) { bytesPerChecksum); } } - + /** * Set whether to verify checksum. */ @@ -95,7 +103,7 @@ public void setVerifyChecksum(boolean verifyChecksum) { public void setWriteChecksum(boolean writeChecksum) { this.writeChecksum = writeChecksum; } - + /** get the raw file system */ @Override public FileSystem getRawFileSystem() { @@ -162,18 +170,18 @@ private static class ChecksumFSInputChecker extends FSInputChecker implements private ChecksumFileSystem fs; private FSDataInputStream datas; private FSDataInputStream sums; - + private static final int HEADER_LENGTH = 8; - + private int bytesPerSum = 1; - + public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) throws IOException { this(fs, file, fs.getConf().getInt( - LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, + LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); } - + public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) throws IOException { super( file, fs.getFileStatus(file).getReplication() ); @@ -189,7 +197,8 @@ public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) if (!Arrays.equals(version, CHECKSUM_VERSION)) throw new IOException("Not a checksum file: "+sumFile); this.bytesPerSum = sums.readInt(); - set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); + set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, + FSInputChecker.CHECKSUM_SIZE); } catch (IOException e) { // mincing the message is terrible, but java throws permission // exceptions as FNF because that's all the method signatures allow! @@ -201,21 +210,21 @@ public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) set(fs.verifyChecksum, null, 1, 0); } } - + private long getChecksumFilePos( long dataPos ) { - return HEADER_LENGTH + 4*(dataPos/bytesPerSum); + return HEADER_LENGTH + FSInputChecker.CHECKSUM_SIZE*(dataPos/bytesPerSum); } - + @Override protected long getChunkPosition( long dataPos ) { return dataPos/bytesPerSum*bytesPerSum; } - + @Override public int available() throws IOException { return datas.available() + super.available(); } - + @Override public int read(long position, byte[] b, int off, int len) throws IOException { @@ -233,7 +242,7 @@ public int read(long position, byte[] b, int off, int len) } return nread; } - + @Override public void close() throws IOException { datas.close(); @@ -242,7 +251,7 @@ public void close() throws IOException { } set(fs.verifyChecksum, null, 1, 0); } - + @Override public boolean seekToNewSource(long targetPos) throws IOException { @@ -265,7 +274,7 @@ protected int readChunk(long pos, byte[] buf, int offset, int len, final int checksumsToRead = Math.min( len/bytesPerSum, // number of checksums based on len to read checksum.length / CHECKSUM_SIZE); // size of checksum buffer - long checksumPos = getChecksumFilePos(pos); + long checksumPos = getChecksumFilePos(pos); if(checksumPos != sums.getPos()) { sums.seek(checksumPos); } @@ -305,8 +314,129 @@ protected int readChunk(long pos, byte[] buf, int offset, int len, public IOStatistics getIOStatistics() { return IOStatisticsSupport.retrieveIOStatistics(datas); } + + public static long findChecksumOffset(long dataOffset, + int bytesPerSum) { + return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE; + } + + /** + * Find the checksum ranges that correspond to the given data ranges. + * @param dataRanges the input data ranges, which are assumed to be sorted + * and non-overlapping + * @return a list of AsyncReaderUtils.CombinedFileRange that correspond to + * the checksum ranges + */ + public static List findChecksumRanges( + List dataRanges, + int bytesPerSum, + int minSeek, + int maxSize) { + List result = new ArrayList<>(); + CombinedFileRange currentCrc = null; + for(FileRange range: dataRanges) { + long crcOffset = findChecksumOffset(range.getOffset(), bytesPerSum); + long crcEnd = findChecksumOffset(range.getOffset() + range.getLength() + + bytesPerSum - 1, bytesPerSum); + if (currentCrc == null || + !currentCrc.merge(crcOffset, crcEnd, range, minSeek, maxSize)) { + currentCrc = new CombinedFileRange(crcOffset, crcEnd, range); + result.add(currentCrc); + } + } + return result; + } + + /** + * Check the data against the checksums. + * @param sumsBytes the checksum data + * @param sumsOffset where from the checksum file this buffer started + * @param data the file data + * @param dataOffset where the file data started (must be a multiple of + * bytesPerSum) + * @param bytesPerSum how many bytes per a checksum + * @param file the path of the filename + * @return the data buffer + * @throws CompletionException if the checksums don't match + */ + static ByteBuffer checkBytes(ByteBuffer sumsBytes, + long sumsOffset, + ByteBuffer data, + long dataOffset, + int bytesPerSum, + Path file) { + // determine how many bytes we need to skip at the start of the sums + int offset = + (int) (findChecksumOffset(dataOffset, bytesPerSum) - sumsOffset); + IntBuffer sums = sumsBytes.asIntBuffer(); + sums.position(offset / FSInputChecker.CHECKSUM_SIZE); + ByteBuffer current = data.duplicate(); + int numChunks = data.remaining() / bytesPerSum; + CRC32 crc = new CRC32(); + // check each chunk to ensure they match + for(int c = 0; c < numChunks; ++c) { + // set the buffer position and the limit + current.limit((c + 1) * bytesPerSum); + current.position(c * bytesPerSum); + // compute the crc + crc.reset(); + crc.update(current); + int expected = sums.get(); + int calculated = (int) crc.getValue(); + + if (calculated != expected) { + // cast of c added to silence findbugs + long errPosn = dataOffset + (long) c * bytesPerSum; + throw new CompletionException(new ChecksumException( + "Checksum error: " + file + " at " + errPosn + + " exp: " + expected + " got: " + calculated, errPosn)); + } + } + // if everything matches, we return the data + return data; + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + // If the stream doesn't have checksums, just delegate. + VectoredReadUtils.validateVectoredReadRanges(ranges); + if (sums == null) { + datas.readVectored(ranges, allocate); + return; + } + int minSeek = minSeekForVectorReads(); + int maxSize = maxReadSizeForVectorReads(); + List dataRanges = + VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum, + minSeek, maxReadSizeForVectorReads()); + List checksumRanges = findChecksumRanges(dataRanges, + bytesPerSum, minSeek, maxSize); + sums.readVectored(checksumRanges, allocate); + datas.readVectored(dataRanges, allocate); + // Data read is correct. I have verified content of dataRanges. + // There is some bug below here as test (testVectoredReadMultipleRanges) + // is failing, should be + // somewhere while slicing the merged data into smaller user ranges. + // Spend some time figuring out but it is a complex code. + for(CombinedFileRange checksumRange: checksumRanges) { + for(FileRange dataRange: checksumRange.getUnderlying()) { + // when we have both the ranges, validate the checksum + CompletableFuture result = + checksumRange.getData().thenCombineAsync(dataRange.getData(), + (sumBuffer, dataBuffer) -> + checkBytes(sumBuffer, checksumRange.getOffset(), + dataBuffer, dataRange.getOffset(), bytesPerSum, file)); + // Now, slice the read data range to the user's ranges + for(FileRange original: ((CombinedFileRange) dataRange).getUnderlying()) { + original.setData(result.thenApply( + (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), original))); + } + } + } + } } - + private static class FSDataBoundedInputStream extends FSDataInputStream { private FileSystem fs; private Path file; @@ -317,12 +447,12 @@ private static class FSDataBoundedInputStream extends FSDataInputStream { this.fs = fs; this.file = file; } - + @Override public boolean markSupported() { return false; } - + /* Return the file length */ private long getFileLength() throws IOException { if( fileLen==-1L ) { @@ -330,7 +460,7 @@ private long getFileLength() throws IOException { } return fileLen; } - + /** * Skips over and discards n bytes of data from the * input stream. @@ -354,11 +484,11 @@ public synchronized long skip(long n) throws IOException { } return super.skip(n); } - + /** * Seek to the given position in the stream. * The next read() will be from that position. - * + * *

This method does not allow seek past the end of the file. * This produces IOException. * @@ -424,22 +554,22 @@ public void concat(final Path f, final Path[] psrcs) throws IOException { */ public static long getChecksumLength(long size, int bytesPerSum) { //the checksum length is equal to size passed divided by bytesPerSum + - //bytes written in the beginning of the checksum file. - return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + - CHECKSUM_VERSION.length + 4; + //bytes written in the beginning of the checksum file. + return ((size + bytesPerSum - 1) / bytesPerSum) * FSInputChecker.CHECKSUM_SIZE + + ChecksumFSInputChecker.HEADER_LENGTH; } /** This class provides an output stream for a checksummed file. * It generates checksums for data. */ private static class ChecksumFSOutputSummer extends FSOutputSummer implements IOStatisticsSource, StreamCapabilities { - private FSDataOutputStream datas; + private FSDataOutputStream datas; private FSDataOutputStream sums; private static final float CHKSUM_AS_FRACTION = 0.01f; private boolean isClosed = false; - - public ChecksumFSOutputSummer(ChecksumFileSystem fs, - Path file, + + ChecksumFSOutputSummer(ChecksumFileSystem fs, + Path file, boolean overwrite, int bufferSize, short replication, @@ -460,7 +590,7 @@ public ChecksumFSOutputSummer(ChecksumFileSystem fs, sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); sums.writeInt(bytesPerSum); } - + @Override public void close() throws IOException { try { @@ -471,7 +601,7 @@ public void close() throws IOException { isClosed = true; } } - + @Override protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) @@ -727,7 +857,7 @@ public boolean rename(Path src, Path dst) throws IOException { value = fs.rename(srcCheckFile, dstCheckFile); } else if (fs.exists(dstCheckFile)) { // no src checksum, so remove dst checksum - value = fs.delete(dstCheckFile, true); + value = fs.delete(dstCheckFile, true); } return value; @@ -759,7 +889,7 @@ public boolean delete(Path f, boolean recursive) throws IOException{ return fs.delete(f, true); } } - + final private static PathFilter DEFAULT_FILTER = new PathFilter() { @Override public boolean accept(Path file) { @@ -770,7 +900,7 @@ public boolean accept(Path file) { /** * List the statuses of the files/directories in the given path if the path is * a directory. - * + * * @param f * given path * @return the statuses of the files/directories in the given path @@ -791,7 +921,7 @@ public RemoteIterator listStatusIterator(final Path p) /** * List the statuses of the files/directories in the given path if the path is * a directory. - * + * * @param f * given path * @return the statuses of the files/directories in the given patch @@ -802,7 +932,7 @@ public RemoteIterator listLocatedStatus(Path f) throws IOException { return fs.listLocatedStatus(f, DEFAULT_FILTER); } - + @Override public boolean mkdirs(Path f) throws IOException { return fs.mkdirs(f); @@ -856,7 +986,7 @@ public void copyToLocalFile(Path src, Path dst, boolean copyCrc) } else { FileStatus[] srcs = listStatus(src); for (FileStatus srcFile : srcs) { - copyToLocalFile(srcFile.getPath(), + copyToLocalFile(srcFile.getPath(), new Path(dst, srcFile.getPath().getName()), copyCrc); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index b143a4cb63..52644402ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -26,6 +26,8 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.EnumSet; +import java.util.List; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -51,7 +53,7 @@ public class FSDataInputStream extends DataInputStream */ private final IdentityHashStore extendedReadBuffers - = new IdentityHashStore(0); + = new IdentityHashStore<>(0); public FSDataInputStream(InputStream in) { super(in); @@ -279,4 +281,20 @@ public void readFully(long position, ByteBuffer buf) throws IOException { public IOStatistics getIOStatistics() { return IOStatisticsSupport.retrieveIOStatistics(in); } + + @Override + public int minSeekForVectorReads() { + return ((PositionedReadable) in).minSeekForVectorReads(); + } + + @Override + public int maxReadSizeForVectorReads() { + return ((PositionedReadable) in).maxReadSizeForVectorReads(); + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + ((PositionedReadable) in).readVectored(ranges, allocate); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java new file mode 100644 index 0000000000..7388e462cc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * A byte range of a file. + * This is used for the asynchronous gather read API of + * {@link PositionedReadable#readVectored}. + */ +public interface FileRange { + + /** + * Get the starting offset of the range. + * @return the byte offset of the start + */ + long getOffset(); + + /** + * Get the length of the range. + * @return the number of bytes in the range. + */ + int getLength(); + + /** + * Get the future data for this range. + * @return the future for the {@link ByteBuffer} that contains the data + */ + CompletableFuture getData(); + + /** + * Set a future for this range's data. + * This method is called by {@link PositionedReadable#readVectored} to store the + * data for the user to pick up later via {@link #getData}. + * @param data the future of the ByteBuffer that will have the data + */ + void setData(CompletableFuture data); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java new file mode 100644 index 0000000000..ef5851154b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * A range of bytes from a file with an optional buffer to read those bytes + * for zero copy. + */ +public class FileRangeImpl implements FileRange { + private long offset; + private int length; + private CompletableFuture reader; + + public FileRangeImpl(long offset, int length) { + this.offset = offset; + this.length = length; + } + + @Override + public String toString() { + return "range[" + offset + "," + (offset + length) + ")"; + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public int getLength() { + return length; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public void setLength(int length) { + this.length = length; + } + + @Override + public void setData(CompletableFuture pReader) { + this.reader = pReader; + } + + @Override + public CompletableFuture getData() { + return reader; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 6744d17a72..7e543ebf22 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,10 +17,15 @@ */ package org.apache.hadoop.fs; -import java.io.*; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.impl.VectoredReadUtils; /** * Stream that permits positional reading. @@ -85,4 +90,38 @@ void readFully(long position, byte[] buffer, int offset, int length) * the read operation completed */ void readFully(long position, byte[] buffer) throws IOException; + + /** + * What is the smallest reasonable seek? + * @return the minimum number of bytes + */ + default int minSeekForVectorReads() { + return 4 * 1024; + } + + /** + * What is the largest size that we should group ranges together as? + * @return the number of bytes to read at once + */ + default int maxReadSizeForVectorReads() { + return 1024 * 1024; + } + + /** + * Read fully a list of file ranges asynchronously from this file. + * The default iterates through the ranges to read each synchronously, but + * the intent is that FSDataInputStream subclasses can make more efficient + * readers. + * As a result of the call, each range will have FileRange.setData(CompletableFuture) + * called with a future that when complete will have a ByteBuffer with the + * data from the file's range. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @throws IOException any IOE. + */ + default void readVectored(List ranges, + IntFunction allocate) throws IOException { + VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(), + maxReadSizeForVectorReads()); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 86e42f5d7c..c71692e67e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ package org.apache.hadoop.fs; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.impl.VectoredReadUtils; import java.io.BufferedOutputStream; import java.io.DataOutput; @@ -33,8 +34,11 @@ import java.io.FileDescriptor; import java.net.URI; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributeView; import java.nio.file.attribute.FileTime; @@ -44,6 +48,9 @@ import java.util.Optional; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicLong; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -130,7 +137,9 @@ public void initialize(URI uri, Configuration conf) throws IOException { class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor, IOStatisticsSource, StreamCapabilities { private FileInputStream fis; + private final File name; private long position; + private AsynchronousFileChannel asyncChannel = null; /** * Minimal set of counters. @@ -148,7 +157,8 @@ class LocalFSFileInputStream extends FSInputStream implements private final AtomicLong bytesRead; public LocalFSFileInputStream(Path f) throws IOException { - fis = new FileInputStream(pathToFile(f)); + name = pathToFile(f); + fis = new FileInputStream(name); bytesRead = ioStatistics.getCounterReference( STREAM_READ_BYTES); } @@ -179,10 +189,16 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Override public int available() throws IOException { return fis.available(); } @Override - public void close() throws IOException { fis.close(); } - @Override public boolean markSupported() { return false; } - + + @Override + public void close() throws IOException { + fis.close(); + if (asyncChannel != null) { + asyncChannel.close(); + } + } + @Override public int read() throws IOException { try { @@ -272,8 +288,88 @@ public boolean hasCapability(String capability) { public IOStatistics getIOStatistics() { return ioStatistics; } + + AsynchronousFileChannel getAsyncChannel() throws IOException { + if (asyncChannel == null) { + synchronized (this) { + asyncChannel = AsynchronousFileChannel.open(name.toPath(), + StandardOpenOption.READ); + } + } + return asyncChannel; + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + + // Set up all of the futures, so that we can use them if things fail + for(FileRange range: ranges) { + VectoredReadUtils.validateRangeRequest(range); + range.setData(new CompletableFuture<>()); + } + try { + AsynchronousFileChannel channel = getAsyncChannel(); + ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; + AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); + for(int i = 0; i < ranges.size(); ++i) { + FileRange range = ranges.get(i); + buffers[i] = allocate.apply(range.getLength()); + channel.read(buffers[i], range.getOffset(), i, asyncHandler); + } + } catch (IOException ioe) { + LOG.debug("Exception occurred during vectored read ", ioe); + for(FileRange range: ranges) { + range.getData().completeExceptionally(ioe); + } + } + } } - + + /** + * A CompletionHandler that implements readFully and translates back + * into the form of CompletionHandler that our users expect. + */ + static class AsyncHandler implements CompletionHandler { + private final AsynchronousFileChannel channel; + private final List ranges; + private final ByteBuffer[] buffers; + + AsyncHandler(AsynchronousFileChannel channel, + List ranges, + ByteBuffer[] buffers) { + this.channel = channel; + this.ranges = ranges; + this.buffers = buffers; + } + + @Override + public void completed(Integer result, Integer r) { + FileRange range = ranges.get(r); + ByteBuffer buffer = buffers[r]; + if (result == -1) { + failed(new EOFException("Read past End of File"), r); + } else { + if (buffer.remaining() > 0) { + // issue a read for the rest of the buffer + // QQ: What if this fails? It has the same handler. + channel.read(buffer, range.getOffset() + buffer.position(), r, this); + } else { + // QQ: Why is this required? I think because we don't want the + // user to read data beyond limit. + buffer.flip(); + range.getData().complete(buffer); + } + } + } + + @Override + public void failed(Throwable exc, Integer r) { + LOG.debug("Failed while reading range " + r + " {} ", exc); + ranges.get(r).getData().completeExceptionally(exc); + } + } + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { getFileStatus(f); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java new file mode 100644 index 0000000000..828a50b4f7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; + +import java.util.ArrayList; +import java.util.List; + +/** + * A file range that represents a set of underlying file ranges. + * This is used when we combine the user's FileRange objects + * together into a single read for efficiency. + */ +public class CombinedFileRange extends FileRangeImpl { + private ArrayList underlying = new ArrayList<>(); + + public CombinedFileRange(long offset, long end, FileRange original) { + super(offset, (int) (end - offset)); + this.underlying.add(original); + } + + /** + * Get the list of ranges that were merged together to form this one. + * @return the list of input ranges + */ + public List getUnderlying() { + return underlying; + } + + /** + * Merge this input range into the current one, if it is compatible. + * It is assumed that otherOffset is greater or equal the current offset, + * which typically happens by sorting the input ranges on offset. + * @param otherOffset the offset to consider merging + * @param otherEnd the end to consider merging + * @param other the underlying FileRange to add if we merge + * @param minSeek the minimum distance that we'll seek without merging the + * ranges together + * @param maxSize the maximum size that we'll merge into a single range + * @return true if we have merged the range into this one + */ + public boolean merge(long otherOffset, long otherEnd, FileRange other, + int minSeek, int maxSize) { + long end = this.getOffset() + this.getLength(); + long newEnd = Math.max(end, otherEnd); + if (otherOffset - end >= minSeek || newEnd - this.getOffset() > maxSize) { + return false; + } + this.setLength((int) (newEnd - this.getOffset())); + underlying.add(other); + return true; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java new file mode 100644 index 0000000000..9a16e6841d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.util.Preconditions; + +/** + * Utility class which implements helper methods used + * in vectored IO implementation. + */ +public final class VectoredReadUtils { + + /** + * Validate a single range. + * @param range file range. + * @throws EOFException any EOF Exception. + */ + public static void validateRangeRequest(FileRange range) + throws EOFException { + + Preconditions.checkArgument(range.getLength() >= 0, "length is negative"); + if (range.getOffset() < 0) { + throw new EOFException("position is negative"); + } + } + + /** + * Validate a list of vectored read ranges. + * @param ranges list of ranges. + * @throws EOFException any EOF exception. + */ + public static void validateVectoredReadRanges(List ranges) + throws EOFException { + for (FileRange range : ranges) { + validateRangeRequest(range); + } + } + + + + /** + * Read fully a list of file ranges asynchronously from this file. + * The default iterates through the ranges to read each synchronously, but + * the intent is that subclasses can make more efficient readers. + * The data or exceptions are pushed into {@link FileRange#getData()}. + * @param stream the stream to read the data from + * @param ranges the byte ranges to read + * @param allocate the byte buffer allocation + * @param minimumSeek the minimum number of bytes to seek over + * @param maximumRead the largest number of bytes to combine into a single read + */ + public static void readVectored(PositionedReadable stream, + List ranges, + IntFunction allocate, + int minimumSeek, + int maximumRead) { + if (isOrderedDisjoint(ranges, 1, minimumSeek)) { + for(FileRange range: ranges) { + range.setData(readRangeFrom(stream, range, allocate)); + } + } else { + for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek, + maximumRead)) { + CompletableFuture read = + readRangeFrom(stream, range, allocate); + for(FileRange child: range.getUnderlying()) { + child.setData(read.thenApply( + (b) -> sliceTo(b, range.getOffset(), child))); + } + } + } + } + + /** + * Synchronously reads a range from the stream dealing with the combinations + * of ByteBuffers buffers and PositionedReadable streams. + * @param stream the stream to read from + * @param range the range to read + * @param allocate the function to allocate ByteBuffers + * @return the CompletableFuture that contains the read data + */ + public static CompletableFuture readRangeFrom(PositionedReadable stream, + FileRange range, + IntFunction allocate) { + CompletableFuture result = new CompletableFuture<>(); + try { + ByteBuffer buffer = allocate.apply(range.getLength()); + if (stream instanceof ByteBufferPositionedReadable) { + ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), + buffer); + buffer.flip(); + } else { + readNonByteBufferPositionedReadable(stream, range, buffer); + } + result.complete(buffer); + } catch (IOException ioe) { + result.completeExceptionally(ioe); + } + return result; + } + + private static void readNonByteBufferPositionedReadable(PositionedReadable stream, + FileRange range, + ByteBuffer buffer) throws IOException { + if (buffer.isDirect()) { + buffer.put(readInDirectBuffer(stream, range)); + buffer.flip(); + } else { + stream.readFully(range.getOffset(), buffer.array(), + buffer.arrayOffset(), range.getLength()); + } + } + + private static byte[] readInDirectBuffer(PositionedReadable stream, + FileRange range) throws IOException { + // if we need to read data from a direct buffer and the stream doesn't + // support it, we allocate a byte array to use. + byte[] tmp = new byte[range.getLength()]; + stream.readFully(range.getOffset(), tmp, 0, tmp.length); + return tmp; + } + + /** + * Is the given input list. + *

    + *
  • already sorted by offset
  • + *
  • each range is more than minimumSeek apart
  • + *
  • the start and end of each range is a multiple of chunkSize
  • + *
+ * + * @param input the list of input ranges. + * @param chunkSize the size of the chunks that the offset and end must align to. + * @param minimumSeek the minimum distance between ranges. + * @return true if we can use the input list as is. + */ + public static boolean isOrderedDisjoint(List input, + int chunkSize, + int minimumSeek) { + long previous = -minimumSeek; + for(FileRange range: input) { + long offset = range.getOffset(); + long end = range.getOffset() + range.getLength(); + if (offset % chunkSize != 0 || + end % chunkSize != 0 || + (offset - previous < minimumSeek)) { + return false; + } + previous = end; + } + return true; + } + + /** + * Calculates floor value of offset based on chunk size. + * @param offset file offset. + * @param chunkSize file chunk size. + * @return floor value. + */ + public static long roundDown(long offset, int chunkSize) { + if (chunkSize > 1) { + return offset - (offset % chunkSize); + } else { + return offset; + } + } + + /** + * Calculates the ceil value of offset based on chunk size. + * @param offset file offset. + * @param chunkSize file chunk size. + * @return ceil value. + */ + public static long roundUp(long offset, int chunkSize) { + if (chunkSize > 1) { + long next = offset + chunkSize - 1; + return next - (next % chunkSize); + } else { + return offset; + } + } + + /** + * Sort and merge ranges to optimize the access from the underlying file + * system. + * The motivations are that: + *
    + *
  • Upper layers want to pass down logical file ranges.
  • + *
  • Fewer reads have better performance.
  • + *
  • Applications want callbacks as ranges are read.
  • + *
  • Some file systems want to round ranges to be at checksum boundaries.
  • + *
+ * + * @param input the list of input ranges + * @param chunkSize round the start and end points to multiples of chunkSize + * @param minimumSeek the smallest gap that we should seek over in bytes + * @param maxSize the largest combined file range in bytes + * @return the list of sorted CombinedFileRanges that cover the input + */ + public static List sortAndMergeRanges(List input, + int chunkSize, + int minimumSeek, + int maxSize) { + // sort the ranges by offset + FileRange[] ranges = input.toArray(new FileRange[0]); + Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset)); + CombinedFileRange current = null; + List result = new ArrayList<>(ranges.length); + + // now merge together the ones that merge + for(FileRange range: ranges) { + long start = roundDown(range.getOffset(), chunkSize); + long end = roundUp(range.getOffset() + range.getLength(), chunkSize); + if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { + current = new CombinedFileRange(start, end, range); + result.add(current); + } + } + return result; + } + + /** + * Slice the data that was read to the user's request. + * This function assumes that the user's request is completely subsumed by the + * read data. This always creates a new buffer pointing to the same underlying + * data but with its own mark and position fields such that reading one buffer + * can't effect other's mark and position. + * @param readData the buffer with the readData + * @param readOffset the offset in the file for the readData + * @param request the user's request + * @return the readData buffer that is sliced to the user's request + */ + public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset, + FileRange request) { + int offsetChange = (int) (request.getOffset() - readOffset); + int requestLength = request.getLength(); + readData = readData.slice(); + readData.position(offsetChange); + readData.limit(offsetChange + requestLength); + return readData; + } + + /** + * private constructor. + */ + private VectoredReadUtils() { + throw new UnsupportedOperationException(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 090696483b..0fe1772d26 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -443,6 +443,37 @@ The semantics of this are exactly equivalent to That is, the buffer is filled entirely with the contents of the input source from position `position` +### `default void readVectored(List ranges, IntFunction allocate)` + +Read fully data for a list of ranges asynchronously. The default implementation +iterates through the ranges, tries to coalesce the ranges based on values of +`minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged +ranges synchronously, but the intent is sub classes can implement efficient +implementation. + +#### Preconditions + +For each requested range: + + range.getOffset >= 0 else raise IllegalArgumentException + range.getLength >= 0 else raise EOFException + +#### Postconditions + +For each requested range: + + range.getData() returns CompletableFuture which will have data + from range.getOffset to range.getLength. + +### `minSeekForVectorReads()` + +Smallest reasonable seek. Two ranges won't be merged together if the difference between +end of first and start of next range is more than this value. + +### `maxReadSizeForVectorReads()` + +Maximum number of bytes which can be read in one go after merging the ranges. +Two ranges won't be merged if the combined data to be read is more than this value. ## Consistency diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java new file mode 100644 index 0000000000..eee4b11e73 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; + +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; + +@RunWith(Parameterized.class) +public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); + + public static final int DATASET_LEN = 64 * 1024; + private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; + private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt"; + private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256); + + private final IntFunction allocate; + + private final String bufferType; + + @Parameterized.Parameters(name = "Buffer type : {0}") + public static List params() { + return Arrays.asList("direct", "array"); + } + + public AbstractContractVectoredReadTest(String bufferType) { + this.bufferType = bufferType; + this.allocate = "array".equals(bufferType) ? + ByteBuffer::allocate : ByteBuffer::allocateDirect; + } + + @Override + public void setup() throws Exception { + super.setup(); + Path path = path(VECTORED_READ_FILE_NAME); + FileSystem fs = getFileSystem(); + createFile(fs, path, true, DATASET); + Path bigFile = path(VECTORED_READ_FILE_1MB_NAME); + createFile(fs, bigFile, true, DATASET_MB); + } + + @Test + public void testVectoredReadMultipleRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + FileRange fileRange = new FileRangeImpl(i * 100, 100); + fileRanges.add(fileRange); + } + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (FileRange res : fileRanges) { + completableFutures[i++] = res.getData(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + combinedFuture.get(); + + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testVectoredReadAndReadFully() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(100, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + byte[] readFullRes = new byte[100]; + in.readFully(100, readFullRes); + ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData()); + Assertions.assertThat(vecRes) + .describedAs("Result from vectored read and readFully must match") + .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); + } + } + + /** + * As the minimum seek value is 4*1024,none of the below ranges + * will get merged. + */ + @Test + public void testDisjointRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, 100)); + fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100)); + fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + /** + * As the minimum seek value is 4*1024, all the below ranges + * will get merged into one. + */ + @Test + public void testAllRangesMergedIntoOne() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, 100)); + fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100)); + fileRanges.add(new FileRangeImpl(8*1024 - 101, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + /** + * As the minimum seek value is 4*1024, the first three ranges will be + * merged into and other two will remain as it is. + */ + @Test + public void testSomeRangesMergedSomeUnmerged() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(8*1024, 100)); + fileRanges.add(new FileRangeImpl(14*1024, 100)); + fileRanges.add(new FileRangeImpl(10*1024, 100)); + fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100)); + fileRanges.add(new FileRangeImpl(40*1024, 1024)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + public void testSameRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(8*1024, 1000)); + fileRanges.add(new FileRangeImpl(8*1024, 1000)); + fileRanges.add(new FileRangeImpl(8*1024, 1000)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testVectoredRead1MBFile() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(1293, 25837)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_1MB_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData()); + FileRange resRange = fileRanges.get(0); + assertDatasetEquals((int) resRange.getOffset(), "vecRead", + vecRes, resRange.getLength(), DATASET_MB); + } + } + + @Test + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, 1000)); + fileRanges.add(new FileRangeImpl(90, 900)); + fileRanges.add(new FileRangeImpl(50, 900)); + fileRanges.add(new FileRangeImpl(10, 980)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + try { + ByteBuffer buffer = data.get(); + // Shouldn't reach here. + Assert.fail("EOFException must be thrown while reading EOF"); + } catch (ExecutionException ex) { + // ignore as expected. + } catch (Exception ex) { + LOG.error("Exception while running vectored read ", ex); + Assert.fail("Exception while running vectored read " + ex); + } + } + } + } + + @Test + public void testNegativeLengthRange() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, -50)); + testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + } + + @Test + public void testNegativeOffsetRange() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(-1, 50)); + testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + } + + @Test + public void testNormalReadAfterVectoredRead() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSomeOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + Assertions.assertThat(in.getPos()) + .describedAs("Vectored read shouldn't change file pointer.") + .isEqualTo(200); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testVectoredReadAfterNormalRead() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSomeOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + Assertions.assertThat(in.getPos()) + .describedAs("Vectored read shouldn't change file pointer.") + .isEqualTo(200); + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testMultipleVectoredReads() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges1 = createSomeOverlappingRanges(); + List fileRanges2 = createSomeOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges1, allocate); + in.readVectored(fileRanges2, allocate); + validateVectoredReadResult(fileRanges2); + validateVectoredReadResult(fileRanges1); + } + } + + protected List createSomeOverlappingRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, 100)); + fileRanges.add(new FileRangeImpl(90, 50)); + return fileRanges; + } + + protected void validateVectoredReadResult(List fileRanges) + throws ExecutionException, InterruptedException { + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (FileRange res : fileRanges) { + completableFutures[i++] = res.getData(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + combinedFuture.get(); + + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + try { + ByteBuffer buffer = FutureIOSupport.awaitFuture(data); + assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET); + } catch (Exception ex) { + LOG.error("Exception while running vectored read ", ex); + Assert.fail("Exception while running vectored read " + ex); + } + } + } + + protected void testExceptionalVectoredRead(FileSystem fs, + List fileRanges, + String s) throws IOException { + boolean exRaised = false; + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + // Can we intercept here as done in S3 tests ?? + in.readVectored(fileRanges, allocate); + } catch (EOFException | IllegalArgumentException ex) { + // expected. + exRaised = true; + } + Assertions.assertThat(exRaised) + .describedAs(s) + .isTrue(); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + * @param originalData + */ + private void assertDatasetEquals( + final int readOffset, final String operation, + final ByteBuffer data, + int length, byte[] originalData) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java new file mode 100644 index 0000000000..099e3b946d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.localfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest { + + public TestLocalFSContractVectoredRead(String bufferType) { + super(bufferType); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java new file mode 100644 index 0000000000..cbb31ffe27 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.rawlocal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestRawLocalContractVectoredRead extends AbstractContractVectoredReadTest { + + public TestRawLocalContractVectoredRead(String bufferType) { + super(bufferType); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RawlocalFSContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java new file mode 100644 index 0000000000..f789f36190 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; +import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; + +/** + * Test behavior of {@link VectoredReadUtils}. + */ +public class TestVectoredReadUtils extends HadoopTestBase { + + @Test + public void testSliceTo() { + final int size = 64 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(size); + // fill the buffer with data + IntBuffer intBuffer = buffer.asIntBuffer(); + for(int i=0; i < size / Integer.BYTES; ++i) { + intBuffer.put(i); + } + // ensure we don't make unnecessary slices + ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, + new FileRangeImpl(100, size)); + Assertions.assertThat(buffer) + .describedAs("Slicing on the same offset shouldn't " + + "create a new buffer") + .isEqualTo(slice); + + // try slicing a range + final int offset = 100; + final int sliceStart = 1024; + final int sliceLength = 16 * 1024; + slice = VectoredReadUtils.sliceTo(buffer, offset, + new FileRangeImpl(offset + sliceStart, sliceLength)); + // make sure they aren't the same, but use the same backing data + Assertions.assertThat(buffer) + .describedAs("Slicing on new offset should " + + "create a new buffer") + .isNotEqualTo(slice); + Assertions.assertThat(buffer.array()) + .describedAs("Slicing should use the same underlying " + + "data") + .isEqualTo(slice.array()); + // test the contents of the slice + intBuffer = slice.asIntBuffer(); + for(int i=0; i < sliceLength / Integer.BYTES; ++i) { + assertEquals("i = " + i, i + sliceStart / Integer.BYTES, intBuffer.get()); + } + } + + @Test + public void testRounding() { + for(int i=5; i < 10; ++i) { + assertEquals("i = "+ i, 5, VectoredReadUtils.roundDown(i, 5)); + assertEquals("i = "+ i, 10, VectoredReadUtils.roundUp(i+1, 5)); + } + assertEquals("Error while roundDown", 13, VectoredReadUtils.roundDown(13, 1)); + assertEquals("Error while roundUp", 13, VectoredReadUtils.roundUp(13, 1)); + } + + @Test + public void testMerge() { + FileRange base = new FileRangeImpl(2000, 1000); + CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); + + // test when the gap between is too big + assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2000, 4000)); + assertEquals("Number of ranges in merged range shouldn't increase", + 1, mergeBase.getUnderlying().size()); + assertEquals("post merge offset", 2000, mergeBase.getOffset()); + assertEquals("post merge length", 1000, mergeBase.getLength()); + + // test when the total size gets exceeded + assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 3999)); + assertEquals("Number of ranges in merged range shouldn't increase", + 1, mergeBase.getUnderlying().size()); + assertEquals("post merge offset", 2000, mergeBase.getOffset()); + assertEquals("post merge length", 1000, mergeBase.getLength()); + + // test when the merge works + assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 4000)); + assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); + assertEquals("post merge offset", 2000, mergeBase.getOffset()); + assertEquals("post merge length", 4000, mergeBase.getLength()); + + // reset the mergeBase and test with a 10:1 reduction + mergeBase = new CombinedFileRange(200, 300, base); + assertEquals(200, mergeBase.getOffset()); + assertEquals(100, mergeBase.getLength()); + assertTrue("ranges should get merged ", mergeBase.merge(500, 600, + new FileRangeImpl(5000, 1000), 201, 400)); + assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); + assertEquals("post merge offset", 200, mergeBase.getOffset()); + assertEquals("post merge length", 400, mergeBase.getLength()); + } + + @Test + public void testSortAndMerge() { + List input = Arrays.asList( + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100), + new FileRangeImpl(1000, 100) + ); + assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.sortAndMergeRanges( + input, 100, 1001, 2500); + assertEquals("merged range size", 1, outputList.size()); + CombinedFileRange output = outputList.get(0); + assertEquals("merged range underlying size", 3, output.getUnderlying().size()); + assertEquals("range[1000,3100)", output.toString()); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); + + // the minSeek doesn't allow the first two to merge + assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100); + assertEquals("merged range size", 2, outputList.size()); + assertEquals("range[1000,1100)", outputList.get(0).toString()); + assertEquals("range[2100,3100)", outputList.get(1).toString()); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); + + // the maxSize doesn't allow the third range to merge + assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099); + assertEquals("merged range size", 2, outputList.size()); + assertEquals("range[1000,2200)", outputList.get(0).toString()); + assertEquals("range[3000,3100)", outputList.get(1).toString()); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); + + // test the round up and round down (the maxSize doesn't allow any merges) + assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); + outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100); + assertEquals("merged range size", 3, outputList.size()); + assertEquals("range[992,1104)", outputList.get(0).toString()); + assertEquals("range[2096,2208)", outputList.get(1).toString()); + assertEquals("range[2992,3104)", outputList.get(2).toString()); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700)); + } + + @Test + public void testSortAndMergeMoreCases() throws Exception { + List input = Arrays.asList( + new FileRangeImpl(3000, 110), + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100), + new FileRangeImpl(1000, 100) + ); + assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.sortAndMergeRanges( + input, 1, 1001, 2500); + assertEquals("merged range size", 1, outputList.size()); + CombinedFileRange output = outputList.get(0); + assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + assertEquals("range[1000,3110)", output.toString()); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); + + outputList = VectoredReadUtils.sortAndMergeRanges( + input, 100, 1001, 2500); + assertEquals("merged range size", 1, outputList.size()); + output = outputList.get(0); + assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + assertEquals("range[1000,3200)", output.toString()); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); + + } + interface Stream extends PositionedReadable, ByteBufferPositionedReadable { + // nothing + } + + static void fillBuffer(ByteBuffer buffer) { + byte b = 0; + while (buffer.remaining() > 0) { + buffer.put(b++); + } + } + + @Test + public void testReadRangeFromByteBufferPositionedReadable() throws Exception { + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + CompletableFuture result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertFutureCompletedSuccessfully(result); + ByteBuffer buffer = result.get(); + assertEquals("Size of result buffer", 100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertFutureFailedExceptionally(result); + } + + static void runReadRangeFromPositionedReadable(IntFunction allocate) + throws Exception { + PositionedReadable stream = Mockito.mock(PositionedReadable.class); + Mockito.doAnswer(invocation -> { + byte b=0; + byte[] buffer = invocation.getArgument(1); + for(int i=0; i < buffer.length; ++i) { + buffer[i] = b++; + } + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + CompletableFuture result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + allocate); + assertFutureCompletedSuccessfully(result); + ByteBuffer buffer = result.get(); + assertEquals("Size of result buffer", 100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertFutureFailedExceptionally(result); + } + + @Test + public void testReadRangeArray() throws Exception { + runReadRangeFromPositionedReadable(ByteBuffer::allocate); + } + + @Test + public void testReadRangeDirect() throws Exception { + runReadRangeFromPositionedReadable(ByteBuffer::allocateDirect); + } + + static void validateBuffer(String message, ByteBuffer buffer, int start) { + byte expected = (byte) start; + while (buffer.remaining() > 0) { + assertEquals(message + " remain: " + buffer.remaining(), expected++, + buffer.get()); + } + } + + @Test + public void testReadVectored() throws Exception { + List input = Arrays.asList(new FileRangeImpl(0, 100), + new FileRangeImpl(100_000, 100), + new FileRangeImpl(200_000, 100)); + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + // should not merge the ranges + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100); + Mockito.verify(stream, Mockito.times(3)) + .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); + for(int b=0; b < input.size(); ++b) { + validateBuffer("buffer " + b, input.get(b).getData().get(), 0); + } + } + + @Test + public void testReadVectoredMerge() throws Exception { + List input = Arrays.asList(new FileRangeImpl(2000, 100), + new FileRangeImpl(1000, 100), + new FileRangeImpl(0, 100)); + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + // should merge the ranges into a single read + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100); + Mockito.verify(stream, Mockito.times(1)) + .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); + for(int b=0; b < input.size(); ++b) { + validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java index 142669b786..f83ef9e63d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java @@ -19,6 +19,9 @@ package org.apache.hadoop.test; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; import org.junit.Assert; /** @@ -28,17 +31,18 @@ public class MoreAsserts { /** * Assert equivalence for array and iterable - * @param the type of the elements - * @param s the name/message for the collection - * @param expected the expected array of elements - * @param actual the actual iterable of elements + * + * @param the type of the elements + * @param s the name/message for the collection + * @param expected the expected array of elements + * @param actual the actual iterable of elements */ public static void assertEquals(String s, T[] expected, Iterable actual) { Iterator it = actual.iterator(); int i = 0; for (; i < expected.length && it.hasNext(); ++i) { - Assert.assertEquals("Element "+ i +" for "+ s, expected[i], it.next()); + Assert.assertEquals("Element " + i + " for " + s, expected[i], it.next()); } Assert.assertTrue("Expected more elements", i == expected.length); Assert.assertTrue("Expected less elements", !it.hasNext()); @@ -46,7 +50,8 @@ public static void assertEquals(String s, T[] expected, /** * Assert equality for two iterables - * @param the type of the elements + * + * @param the type of the elements * @param s * @param expected * @param actual @@ -57,10 +62,28 @@ public static void assertEquals(String s, Iterable expected, Iterator ita = actual.iterator(); int i = 0; while (ite.hasNext() && ita.hasNext()) { - Assert.assertEquals("Element "+ i +" for "+s, ite.next(), ita.next()); + Assert.assertEquals("Element " + i + " for " + s, ite.next(), ita.next()); } Assert.assertTrue("Expected more elements", !ite.hasNext()); Assert.assertTrue("Expected less elements", !ita.hasNext()); } + + public static void assertFutureCompletedSuccessfully(CompletableFuture future) { + Assertions.assertThat(future.isDone()) + .describedAs("This future is supposed to be " + + "completed successfully") + .isTrue(); + Assertions.assertThat(future.isCompletedExceptionally()) + .describedAs("This future is supposed to be " + + "completed successfully") + .isFalse(); + } + + public static void assertFutureFailedExceptionally(CompletableFuture future) { + Assertions.assertThat(future.isCompletedExceptionally()) + .describedAs("This future is supposed to be " + + "completed exceptionally") + .isTrue(); + } } diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml index 6064b8c494..b2a5fd3a4f 100644 --- a/hadoop-common-project/pom.xml +++ b/hadoop-common-project/pom.xml @@ -56,5 +56,4 @@ - diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 8dc1862ed5..39dac39eac 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -218,6 +218,7 @@ v12.22.1 v1.22.5 1.10.11 + 1.20 @@ -1603,6 +1604,16 @@ + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + org.apache.curator curator-test diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 149216441f..943416e2d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1463,11 +1463,12 @@ private FSDataInputStream executeOpen( fileInformation.applyOptions(readContext); LOG.debug("Opening '{}'", readContext); return new FSDataInputStream( - new S3AInputStream( - readContext.build(), - createObjectAttributes(path, fileStatus), - createInputStreamCallbacks(auditSpan), - inputStreamStats)); + new S3AInputStream( + readContext.build(), + createObjectAttributes(path, fileStatus), + createInputStreamCallbacks(auditSpan), + inputStreamStats, + unboundedThreadPool)); } /** @@ -4925,9 +4926,8 @@ public AWSCredentialProviderList shareCredentials(final String purpose) { /** * This is a proof of concept of a select API. * @param source path to source data - * @param expression select expression * @param options request configuration from the builder. - * @param providedStatus any passed in status + * @param fileInformation any passed in information. * @return the stream of the results * @throws IOException IO failure */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 6beeb2891e..05d9c7f9fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -19,38 +19,49 @@ package org.apache.hadoop.fs.s3a; import javax.annotation.Nullable; - -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; - -import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.util.Preconditions; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.CanSetReadahead; -import org.apache.hadoop.fs.CanUnbuffer; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; -import org.apache.hadoop.fs.s3a.impl.ChangeTracker; -import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.fs.PathIOException; -import org.apache.hadoop.fs.StreamCapabilities; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.util.functional.CallableRaisingIOE; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.IntFunction; + +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.impl.CombinedFileRange; +import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.apache.hadoop.fs.impl.VectoredReadUtils; +import org.apache.hadoop.fs.s3a.impl.ChangeTracker; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.CallableRaisingIOE; import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -88,6 +99,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * size of a buffer to create when draining the stream. */ private static final int DRAIN_BUFFER_SIZE = 16384; + /** + * This is the maximum temporary buffer size we use while + * populating the data in direct byte buffers during a vectored IO + * operation. This is to ensure that when a big range of data is + * requested in direct byte buffer doesn't leads to OOM errors. + */ + private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; /** * This is the public position; the one set in {@link #seek(long)} @@ -111,6 +129,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private S3ObjectInputStream wrappedStream; private final S3AReadOpContext context; private final InputStreamCallbacks client; + + /** + * Thread pool used for vectored IO operation. + */ + private final ThreadPoolExecutor unboundedThreadPool; private final String bucket; private final String key; private final String pathStr; @@ -160,12 +183,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * @param ctx operation context * @param s3Attributes object attributes * @param client S3 client to use - * @param streamStatistics statistics for this stream + * @param unboundedThreadPool thread pool to use. */ public S3AInputStream(S3AReadOpContext ctx, - S3ObjectAttributes s3Attributes, - InputStreamCallbacks client, - S3AInputStreamStatistics streamStatistics) { + S3ObjectAttributes s3Attributes, + InputStreamCallbacks client, + S3AInputStreamStatistics streamStatistics, + ThreadPoolExecutor unboundedThreadPool) { Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); @@ -187,6 +211,7 @@ public S3AInputStream(S3AReadOpContext ctx, setInputPolicy(ctx.getInputPolicy()); setReadahead(ctx.getReadahead()); this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); + this.unboundedThreadPool = unboundedThreadPool; } /** @@ -880,6 +905,231 @@ public void readFully(long position, byte[] buffer, int offset, int length) } } + /** + * {@inheritDoc} + * Vectored read implementation for S3AInputStream. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + + LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); + checkNotClosed(); + for (FileRange range : ranges) { + validateRangeRequest(range); + CompletableFuture result = new CompletableFuture<>(); + range.setData(result); + } + + if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { + LOG.debug("Not merging the ranges as they are disjoint"); + for(FileRange range: ranges) { + ByteBuffer buffer = allocate.apply(range.getLength()); + unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); + } + } else { + LOG.debug("Trying to merge the ranges as they are not disjoint"); + List combinedFileRanges = sortAndMergeRanges(ranges, + 1, minSeekForVectorReads(), + maxReadSizeForVectorReads()); + LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", + ranges.size(), combinedFileRanges.size()); + for(CombinedFileRange combinedFileRange: combinedFileRanges) { + CompletableFuture result = new CompletableFuture<>(); + ByteBuffer buffer = allocate.apply(combinedFileRange.getLength()); + combinedFileRange.setData(result); + unboundedThreadPool.submit( + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); + } + } + LOG.debug("Finished submitting vectored read to threadpool" + + " on path {} for ranges {} ", pathStr, ranges); + } + + /** + * Read data in the combinedFileRange and update data in buffers + * of all underlying ranges. + * @param combinedFileRange combined range. + * @param buffer combined buffer. + */ + private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, + ByteBuffer buffer) { + // Not putting read single range call inside try block as + // exception if any occurred during this call will be raised + // during awaitFuture call while getting the combined buffer. + readSingleRange(combinedFileRange, buffer); + try { + // In case of single range we return the original byte buffer else + // we return slice byte buffers for each child ranges. + ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); + if (combinedFileRange.getUnderlying().size() == 1) { + combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer); + } else { + for (FileRange child : combinedFileRange.getUnderlying()) { + updateOriginalRange(child, combinedBuffer, combinedFileRange); + } + } + } catch (Exception ex) { + LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex); + for(FileRange child : combinedFileRange.getUnderlying()) { + child.getData().completeExceptionally(ex); + } + } + } + + /** + * Update data in child range from combined range. + * @param child child range. + * @param combinedBuffer combined buffer. + * @param combinedFileRange combined range. + */ + private void updateOriginalRange(FileRange child, + ByteBuffer combinedBuffer, + CombinedFileRange combinedFileRange) { + LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ", + child.getOffset(), child.getLength(), + combinedFileRange.getOffset(), combinedFileRange.getLength()); + ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child); + child.getData().complete(childBuffer); + LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ", + child.getOffset(), child.getLength(), + combinedFileRange.getOffset(), combinedFileRange.getLength()); + } + + /** + * // Check if we can use contentLength returned by http GET request. + * Validates range parameters. + * @param range requested range. + * @throws EOFException end of file exception. + */ + private void validateRangeRequest(FileRange range) throws EOFException { + VectoredReadUtils.validateRangeRequest(range); + if(range.getOffset() + range.getLength() > contentLength) { + LOG.warn("Requested range [{}, {}) is beyond EOF for path {}", + range.getOffset(), range.getLength(), pathStr); + throw new EOFException("Requested range [" + range.getOffset() +", " + + range.getLength() + ") is beyond EOF for path " + pathStr); + } + } + + /** + * TODO: Add retry in client.getObject(). not present in older reads why here?? + * Okay retry is being done in the top layer during read. + * But if we do here in the top layer, one issue I am thinking is + * what if there is some error which happened during filling the buffer + * If we retry that old offsets of heap buffers can be overwritten ? + * I think retry should be only added in {@link S3AInputStream#getS3Object} + * Read data from S3 for this range and populate the bufffer. + * @param range range of data to read. + * @param buffer buffer to fill. + */ + private void readSingleRange(FileRange range, ByteBuffer buffer) { + LOG.debug("Start reading range {} from path {} ", range, pathStr); + S3Object objectRange = null; + S3ObjectInputStream objectContent = null; + try { + long position = range.getOffset(); + int length = range.getLength(); + final String operationName = "readRange"; + objectRange = getS3Object(operationName, position, length); + objectContent = objectRange.getObjectContent(); + if (objectContent == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); + } + populateBuffer(length, buffer, objectContent); + range.getData().complete(buffer); + } catch (Exception ex) { + LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex); + range.getData().completeExceptionally(ex); + } finally { + IOUtils.cleanupWithLogger(LOG, objectRange, objectContent); + } + LOG.debug("Finished reading range {} from path {} ", range, pathStr); + } + + /** + * Populates the buffer with data from objectContent + * till length. Handles both direct and heap byte buffers. + * @param length length of data to populate. + * @param buffer buffer to fill. + * @param objectContent result retrieved from S3 store. + * @throws IOException any IOE. + */ + private void populateBuffer(int length, + ByteBuffer buffer, + S3ObjectInputStream objectContent) throws IOException { + if (buffer.isDirect()) { + int readBytes = 0; + int offset = 0; + byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE]; + while (readBytes < length) { + int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ? + TMP_BUFFER_MAX_SIZE + : length - readBytes; + readByteArray(objectContent, tmp, 0, currentLength); + buffer.put(tmp, 0, currentLength); + offset = offset + currentLength; + readBytes = readBytes + currentLength; + } + buffer.flip(); + } else { + readByteArray(objectContent, buffer.array(), 0, length); + } + } + + public void readByteArray(S3ObjectInputStream objectContent, + byte[] dest, + int offset, + int length) throws IOException { + int readBytes = 0; + while (readBytes < length) { + int readBytesCurr = objectContent.read(dest, + offset + readBytes, + length - readBytes); + readBytes +=readBytesCurr; + if (readBytesCurr < 0) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + } + } + + /** + * Read data from S3 using a http request. + * This also handles if file has been changed while http call + * is getting executed. If file has been changed RemoteFileChangedException + * is thrown. + * @param operationName name of the operation for which get object on S3 is called. + * @param position position of the object to be read from S3. + * @param length length from position of the object to be read from S3. + * @return S3Object + * @throws IOException exception if any. + */ + private S3Object getS3Object(String operationName, long position, + int length) throws IOException { + final GetObjectRequest request = client.newGetRequest(key) + .withRange(position, position + length - 1); + changeTracker.maybeApplyConstraint(request); + DurationTracker tracker = streamStatistics.initiateGetRequest(); + S3Object objectRange; + Invoker invoker = context.getReadInvoker(); + try { + objectRange = invoker.retry(operationName, pathStr, true, + () -> client.getObject(request)); + } catch (IOException ex) { + tracker.failed(); + throw ex; + } finally { + tracker.close(); + } + changeTracker.processResponse(objectRange, operationName, + position); + return objectRange; + } + /** * Access the input stream statistics. * This is for internal testing and may be removed without warning. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java new file mode 100644 index 0000000000..255cc6501c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +import java.util.ArrayList; +import java.util.List; + +public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { + + public ITestS3AContractVectoredRead(String bufferType) { + super(bufferType); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + /** + * Overriding in S3 vectored read api fails fast in case of EOF + * requested range. + * @throws Exception + */ + @Override + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index 62f5bff35c..c62bf5daca 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -111,7 +111,8 @@ private S3AInputStream getMockedS3AInputStream() { s3AReadOpContext, s3ObjectAttributes, getMockedInputStreamCallback(), - s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics()); + s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), + null); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index fc287e9845..c831999008 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -52,7 +52,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN # for debugging low level S3a operations, uncomment these lines # Log all S3A classes -#log4j.logger.org.apache.hadoop.fs.s3a=DEBUG +log4j.logger.org.apache.hadoop.fs.s3a=DEBUG #log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO #log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO diff --git a/hadoop-tools/hadoop-benchmark/pom.xml b/hadoop-tools/hadoop-benchmark/pom.xml new file mode 100644 index 0000000000..3d742fab5c --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/pom.xml @@ -0,0 +1,94 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 3.4.0-SNAPSHOT + ../../hadoop-project/pom.xml + + hadoop-benchmark + 3.4.0-SNAPSHOT + jar + + Apache Hadoop Common Benchmark + Apache Hadoop Common Benchmark + + + + org.apache.hadoop + hadoop-common + + + org.openjdk.jmh + jmh-core + + + org.openjdk.jmh + jmh-generator-annprocess + + + + + + + maven-assembly-plugin + + + + org.apache.hadoop.benchmark.VectoredReadBenchmark + + + + src/main/assembly/uber.xml + + + + + make-assembly + package + + single + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + ${basedir}/src/main/findbugs/exclude.xml + + + + com.github.spotbugs + spotbugs-maven-plugin + + ${basedir}/src/main/findbugs/exclude.xml + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + diff --git a/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml b/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml new file mode 100644 index 0000000000..014eab951b --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml @@ -0,0 +1,33 @@ + + + uber + + jar + + false + + + / + true + true + runtime + + + + + metaInf-services + + + diff --git a/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml b/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml new file mode 100644 index 0000000000..05f2a067cf --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml @@ -0,0 +1,22 @@ + + + + + + + + + + diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java new file mode 100644 index 0000000000..aaee951d72 --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.benchmark; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.FileSystems; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class VectoredReadBenchmark { + + static final Path DATA_PATH = getTestDataPath(); + static final String DATA_PATH_PROPERTY = "bench.data"; + static final int READ_SIZE = 64 * 1024; + static final long SEEK_SIZE = 1024L * 1024; + + + static Path getTestDataPath() { + String value = System.getProperty(DATA_PATH_PROPERTY); + return new Path(value == null ? "/tmp/taxi.orc" : value); + } + + @State(Scope.Thread) + public static class FileSystemChoice { + + @Param({"local", "raw"}) + private String fileSystemKind; + + private Configuration conf; + private FileSystem fs; + + @Setup(Level.Trial) + public void setup() { + conf = new Configuration(); + try { + LocalFileSystem local = FileSystem.getLocal(conf); + fs = "raw".equals(fileSystemKind) ? local.getRaw() : local; + } catch (IOException e) { + throw new IllegalArgumentException("Can't get filesystem", e); + } + } + } + + @State(Scope.Thread) + public static class BufferChoice { + @Param({"direct", "array"}) + private String bufferKind; + + private IntFunction allocate; + @Setup(Level.Trial) + public void setup() { + allocate = "array".equals(bufferKind) + ? ByteBuffer::allocate : ByteBuffer::allocateDirect; + } + } + + @Benchmark + public void asyncRead(FileSystemChoice fsChoice, + BufferChoice bufferChoice, + Blackhole blackhole) throws Exception { + FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); + List ranges = new ArrayList<>(); + for(int m=0; m < 100; ++m) { + FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE); + ranges.add(range); + } + stream.readVectored(ranges, bufferChoice.allocate); + for(FileRange range: ranges) { + blackhole.consume(range.getData().get()); + } + stream.close(); + } + + static class Joiner implements CompletionHandler { + private int remaining; + private final ByteBuffer[] result; + private Throwable exception = null; + + Joiner(int total) { + remaining = total; + result = new ByteBuffer[total]; + } + + synchronized void finish() { + remaining -= 1; + if (remaining == 0) { + notify(); + } + } + + synchronized ByteBuffer[] join() throws InterruptedException, IOException { + while (remaining > 0 && exception == null) { + wait(); + } + if (exception != null) { + throw new IOException("problem reading", exception); + } + return result; + } + + + @Override + public synchronized void completed(ByteBuffer buffer, FileRange attachment) { + result[--remaining] = buffer; + if (remaining == 0) { + notify(); + } + } + + @Override + public synchronized void failed(Throwable exc, FileRange attachment) { + this.exception = exc; + notify(); + } + } + + static class FileRangeCallback extends FileRangeImpl implements + CompletionHandler { + private final AsynchronousFileChannel channel; + private final ByteBuffer buffer; + private int completed = 0; + private final Joiner joiner; + + FileRangeCallback(AsynchronousFileChannel channel, long offset, + int length, Joiner joiner, ByteBuffer buffer) { + super(offset, length); + this.channel = channel; + this.joiner = joiner; + this.buffer = buffer; + } + + @Override + public void completed(Integer result, FileRangeCallback attachment) { + final int bytes = result; + if (bytes == -1) { + failed(new EOFException("Read past end of file"), this); + } + completed += bytes; + if (completed < this.getLength()) { + channel.read(buffer, this.getOffset() + completed, this, this); + } else { + buffer.flip(); + joiner.finish(); + } + } + + @Override + public void failed(Throwable exc, FileRangeCallback attachment) { + joiner.failed(exc, this); + } + } + + @Benchmark + public void asyncFileChanArray(BufferChoice bufferChoice, + Blackhole blackhole) throws Exception { + java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString()); + AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); + List ranges = new ArrayList<>(); + Joiner joiner = new Joiner(100); + for(int m=0; m < 100; ++m) { + ByteBuffer buffer = bufferChoice.allocate.apply(READ_SIZE); + FileRangeCallback range = new FileRangeCallback(channel, m * SEEK_SIZE, + READ_SIZE, joiner, buffer); + ranges.add(range); + channel.read(buffer, range.getOffset(), range, range); + } + joiner.join(); + channel.close(); + blackhole.consume(ranges); + } + + @Benchmark + public void syncRead(FileSystemChoice fsChoice, + Blackhole blackhole) throws Exception { + FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); + List result = new ArrayList<>(); + for(int m=0; m < 100; ++m) { + byte[] buffer = new byte[READ_SIZE]; + stream.readFully(m * SEEK_SIZE, buffer); + result.add(buffer); + } + blackhole.consume(result); + stream.close(); + } + + /** + * Run the benchmarks. + * @param args the pathname of a 100MB data file + * @throws Exception any ex. + */ + public static void main(String[] args) throws Exception { + OptionsBuilder opts = new OptionsBuilder(); + opts.include("VectoredReadBenchmark"); + opts.jvmArgs("-server", "-Xms256m", "-Xmx2g", + "-D" + DATA_PATH_PROPERTY + "=" + args[0]); + opts.forks(1); + new Runner(opts.build()).run(); + } +} diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/package-info.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/package-info.java new file mode 100644 index 0000000000..95d6977e3a --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Benchmark for Vectored Read IO operations. + */ +package org.apache.hadoop.benchmark; diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index 860e71a8ea..b05e489261 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -50,6 +50,7 @@ hadoop-azure-datalake hadoop-aliyun hadoop-fs2img + hadoop-benchmark