HADOOP-11867. Add a high-performance vectored read API. (#3904)

part of HADOOP-18103.
Add support for multiple ranged vectored read api in PositionedReadable.
The default iterates through the ranges to read each synchronously,
but the intent is that FSDataInputStream subclasses can make more
efficient readers especially in object stores implementation.

Also added implementation in S3A where smaller ranges are merged and
sliced byte buffers are returned to the readers. All the merged ranged are
fetched from S3 asynchronously.

Contributed By: Owen O'Malley and Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-02-01 19:52:38 +05:30 committed by Steve Loughran
parent e6ecc4f3e4
commit 2daf0a814f
30 changed files with 2448 additions and 98 deletions

View File

@ -47,7 +47,7 @@ pipeline {
options {
buildDiscarder(logRotator(numToKeepStr: '5'))
timeout (time: 24, unit: 'HOURS')
timeout (time: 48, unit: 'HOURS')
timestamps()
checkoutToSubdirectory('src')
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -22,6 +22,9 @@
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.StringJoiner;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -158,8 +161,24 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
return new StringJoiner(", ",
BufferedFSInputStream.class.getSimpleName() + "[", "]")
.add("in=" + in)
.toString();
BufferedFSInputStream.class.getSimpleName() + "[", "]")
.add("in=" + in)
.toString();
}
@Override
public int minSeekForVectorReads() {
return ((PositionedReadable) in).minSeekForVectorReads();
}
@Override
public int maxReadSizeForVectorReads() {
return ((PositionedReadable) in).maxReadSizeForVectorReads();
}
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}
}

View File

@ -22,17 +22,25 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.IntFunction;
import java.util.zip.CRC32;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
@ -66,7 +74,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
public static double getApproxChkSumLength(long size) {
return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
}
public ChecksumFileSystem(FileSystem fs) {
super(fs);
}
@ -82,7 +90,7 @@ public void setConf(Configuration conf) {
bytesPerChecksum);
}
}
/**
* Set whether to verify checksum.
*/
@ -95,7 +103,7 @@ public void setVerifyChecksum(boolean verifyChecksum) {
public void setWriteChecksum(boolean writeChecksum) {
this.writeChecksum = writeChecksum;
}
/** get the raw file system */
@Override
public FileSystem getRawFileSystem() {
@ -162,18 +170,18 @@ private static class ChecksumFSInputChecker extends FSInputChecker implements
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
private static final int HEADER_LENGTH = 8;
private int bytesPerSum = 1;
public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
throws IOException {
this(fs, file, fs.getConf().getInt(
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
}
public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
throws IOException {
super( file, fs.getFileStatus(file).getReplication() );
@ -189,7 +197,8 @@ public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
if (!Arrays.equals(version, CHECKSUM_VERSION))
throw new IOException("Not a checksum file: "+sumFile);
this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum,
FSInputChecker.CHECKSUM_SIZE);
} catch (IOException e) {
// mincing the message is terrible, but java throws permission
// exceptions as FNF because that's all the method signatures allow!
@ -201,21 +210,21 @@ public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
set(fs.verifyChecksum, null, 1, 0);
}
}
private long getChecksumFilePos( long dataPos ) {
return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
return HEADER_LENGTH + FSInputChecker.CHECKSUM_SIZE*(dataPos/bytesPerSum);
}
@Override
protected long getChunkPosition( long dataPos ) {
return dataPos/bytesPerSum*bytesPerSum;
}
@Override
public int available() throws IOException {
return datas.available() + super.available();
}
@Override
public int read(long position, byte[] b, int off, int len)
throws IOException {
@ -233,7 +242,7 @@ public int read(long position, byte[] b, int off, int len)
}
return nread;
}
@Override
public void close() throws IOException {
datas.close();
@ -242,7 +251,7 @@ public void close() throws IOException {
}
set(fs.verifyChecksum, null, 1, 0);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
@ -265,7 +274,7 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
final int checksumsToRead = Math.min(
len/bytesPerSum, // number of checksums based on len to read
checksum.length / CHECKSUM_SIZE); // size of checksum buffer
long checksumPos = getChecksumFilePos(pos);
long checksumPos = getChecksumFilePos(pos);
if(checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
@ -305,8 +314,129 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
public static long findChecksumOffset(long dataOffset,
int bytesPerSum) {
return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE;
}
/**
* Find the checksum ranges that correspond to the given data ranges.
* @param dataRanges the input data ranges, which are assumed to be sorted
* and non-overlapping
* @return a list of AsyncReaderUtils.CombinedFileRange that correspond to
* the checksum ranges
*/
public static List<CombinedFileRange> findChecksumRanges(
List<? extends FileRange> dataRanges,
int bytesPerSum,
int minSeek,
int maxSize) {
List<CombinedFileRange> result = new ArrayList<>();
CombinedFileRange currentCrc = null;
for(FileRange range: dataRanges) {
long crcOffset = findChecksumOffset(range.getOffset(), bytesPerSum);
long crcEnd = findChecksumOffset(range.getOffset() + range.getLength() +
bytesPerSum - 1, bytesPerSum);
if (currentCrc == null ||
!currentCrc.merge(crcOffset, crcEnd, range, minSeek, maxSize)) {
currentCrc = new CombinedFileRange(crcOffset, crcEnd, range);
result.add(currentCrc);
}
}
return result;
}
/**
* Check the data against the checksums.
* @param sumsBytes the checksum data
* @param sumsOffset where from the checksum file this buffer started
* @param data the file data
* @param dataOffset where the file data started (must be a multiple of
* bytesPerSum)
* @param bytesPerSum how many bytes per a checksum
* @param file the path of the filename
* @return the data buffer
* @throws CompletionException if the checksums don't match
*/
static ByteBuffer checkBytes(ByteBuffer sumsBytes,
long sumsOffset,
ByteBuffer data,
long dataOffset,
int bytesPerSum,
Path file) {
// determine how many bytes we need to skip at the start of the sums
int offset =
(int) (findChecksumOffset(dataOffset, bytesPerSum) - sumsOffset);
IntBuffer sums = sumsBytes.asIntBuffer();
sums.position(offset / FSInputChecker.CHECKSUM_SIZE);
ByteBuffer current = data.duplicate();
int numChunks = data.remaining() / bytesPerSum;
CRC32 crc = new CRC32();
// check each chunk to ensure they match
for(int c = 0; c < numChunks; ++c) {
// set the buffer position and the limit
current.limit((c + 1) * bytesPerSum);
current.position(c * bytesPerSum);
// compute the crc
crc.reset();
crc.update(current);
int expected = sums.get();
int calculated = (int) crc.getValue();
if (calculated != expected) {
// cast of c added to silence findbugs
long errPosn = dataOffset + (long) c * bytesPerSum;
throw new CompletionException(new ChecksumException(
"Checksum error: " + file + " at " + errPosn +
" exp: " + expected + " got: " + calculated, errPosn));
}
}
// if everything matches, we return the data
return data;
}
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
// If the stream doesn't have checksums, just delegate.
VectoredReadUtils.validateVectoredReadRanges(ranges);
if (sums == null) {
datas.readVectored(ranges, allocate);
return;
}
int minSeek = minSeekForVectorReads();
int maxSize = maxReadSizeForVectorReads();
List<CombinedFileRange> dataRanges =
VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum,
minSeek, maxReadSizeForVectorReads());
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
// Data read is correct. I have verified content of dataRanges.
// There is some bug below here as test (testVectoredReadMultipleRanges)
// is failing, should be
// somewhere while slicing the merged data into smaller user ranges.
// Spend some time figuring out but it is a complex code.
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
CompletableFuture<ByteBuffer> result =
checksumRange.getData().thenCombineAsync(dataRange.getData(),
(sumBuffer, dataBuffer) ->
checkBytes(sumBuffer, checksumRange.getOffset(),
dataBuffer, dataRange.getOffset(), bytesPerSum, file));
// Now, slice the read data range to the user's ranges
for(FileRange original: ((CombinedFileRange) dataRange).getUnderlying()) {
original.setData(result.thenApply(
(b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), original)));
}
}
}
}
}
private static class FSDataBoundedInputStream extends FSDataInputStream {
private FileSystem fs;
private Path file;
@ -317,12 +447,12 @@ private static class FSDataBoundedInputStream extends FSDataInputStream {
this.fs = fs;
this.file = file;
}
@Override
public boolean markSupported() {
return false;
}
/* Return the file length */
private long getFileLength() throws IOException {
if( fileLen==-1L ) {
@ -330,7 +460,7 @@ private long getFileLength() throws IOException {
}
return fileLen;
}
/**
* Skips over and discards <code>n</code> bytes of data from the
* input stream.
@ -354,11 +484,11 @@ public synchronized long skip(long n) throws IOException {
}
return super.skip(n);
}
/**
* Seek to the given position in the stream.
* The next read() will be from that position.
*
*
* <p>This method does not allow seek past the end of the file.
* This produces IOException.
*
@ -424,22 +554,22 @@ public void concat(final Path f, final Path[] psrcs) throws IOException {
*/
public static long getChecksumLength(long size, int bytesPerSum) {
//the checksum length is equal to size passed divided by bytesPerSum +
//bytes written in the beginning of the checksum file.
return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
CHECKSUM_VERSION.length + 4;
//bytes written in the beginning of the checksum file.
return ((size + bytesPerSum - 1) / bytesPerSum) * FSInputChecker.CHECKSUM_SIZE +
ChecksumFSInputChecker.HEADER_LENGTH;
}
/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer
implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
private boolean isClosed = false;
public ChecksumFSOutputSummer(ChecksumFileSystem fs,
Path file,
ChecksumFSOutputSummer(ChecksumFileSystem fs,
Path file,
boolean overwrite,
int bufferSize,
short replication,
@ -460,7 +590,7 @@ public ChecksumFSOutputSummer(ChecksumFileSystem fs,
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
sums.writeInt(bytesPerSum);
}
@Override
public void close() throws IOException {
try {
@ -471,7 +601,7 @@ public void close() throws IOException {
isClosed = true;
}
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
@ -727,7 +857,7 @@ public boolean rename(Path src, Path dst) throws IOException {
value = fs.rename(srcCheckFile, dstCheckFile);
} else if (fs.exists(dstCheckFile)) {
// no src checksum, so remove dst checksum
value = fs.delete(dstCheckFile, true);
value = fs.delete(dstCheckFile, true);
}
return value;
@ -759,7 +889,7 @@ public boolean delete(Path f, boolean recursive) throws IOException{
return fs.delete(f, true);
}
}
final private static PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
@ -770,7 +900,7 @@ public boolean accept(Path file) {
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
*
* @param f
* given path
* @return the statuses of the files/directories in the given path
@ -791,7 +921,7 @@ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
*
* @param f
* given path
* @return the statuses of the files/directories in the given patch
@ -802,7 +932,7 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return fs.listLocatedStatus(f, DEFAULT_FILTER);
}
@Override
public boolean mkdirs(Path f) throws IOException {
return fs.mkdirs(f);
@ -856,7 +986,7 @@ public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
} else {
FileStatus[] srcs = listStatus(src);
for (FileStatus srcFile : srcs) {
copyToLocalFile(srcFile.getPath(),
copyToLocalFile(srcFile.getPath(),
new Path(dst, srcFile.getPath().getName()), copyCrc);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -26,6 +26,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -51,7 +53,7 @@ public class FSDataInputStream extends DataInputStream
*/
private final IdentityHashStore<ByteBuffer, ByteBufferPool>
extendedReadBuffers
= new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
= new IdentityHashStore<>(0);
public FSDataInputStream(InputStream in) {
super(in);
@ -279,4 +281,20 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(in);
}
@Override
public int minSeekForVectorReads() {
return ((PositionedReadable) in).minSeekForVectorReads();
}
@Override
public int maxReadSizeForVectorReads() {
return ((PositionedReadable) in).maxReadSizeForVectorReads();
}
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
/**
* A byte range of a file.
* This is used for the asynchronous gather read API of
* {@link PositionedReadable#readVectored}.
*/
public interface FileRange {
/**
* Get the starting offset of the range.
* @return the byte offset of the start
*/
long getOffset();
/**
* Get the length of the range.
* @return the number of bytes in the range.
*/
int getLength();
/**
* Get the future data for this range.
* @return the future for the {@link ByteBuffer} that contains the data
*/
CompletableFuture<ByteBuffer> getData();
/**
* Set a future for this range's data.
* This method is called by {@link PositionedReadable#readVectored} to store the
* data for the user to pick up later via {@link #getData}.
* @param data the future of the ByteBuffer that will have the data
*/
void setData(CompletableFuture<ByteBuffer> data);
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
/**
* A range of bytes from a file with an optional buffer to read those bytes
* for zero copy.
*/
public class FileRangeImpl implements FileRange {
private long offset;
private int length;
private CompletableFuture<ByteBuffer> reader;
public FileRangeImpl(long offset, int length) {
this.offset = offset;
this.length = length;
}
@Override
public String toString() {
return "range[" + offset + "," + (offset + length) + ")";
}
@Override
public long getOffset() {
return offset;
}
@Override
public int getLength() {
return length;
}
public void setOffset(long offset) {
this.offset = offset;
}
public void setLength(int length) {
this.length = length;
}
@Override
public void setData(CompletableFuture<ByteBuffer> pReader) {
this.reader = pReader;
}
@Override
public CompletableFuture<ByteBuffer> getData() {
return reader;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,10 +17,15 @@
*/
package org.apache.hadoop.fs;
import java.io.*;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
/**
* Stream that permits positional reading.
@ -85,4 +90,38 @@ void readFully(long position, byte[] buffer, int offset, int length)
* the read operation completed
*/
void readFully(long position, byte[] buffer) throws IOException;
/**
* What is the smallest reasonable seek?
* @return the minimum number of bytes
*/
default int minSeekForVectorReads() {
return 4 * 1024;
}
/**
* What is the largest size that we should group ranges together as?
* @return the number of bytes to read at once
*/
default int maxReadSizeForVectorReads() {
return 1024 * 1024;
}
/**
* Read fully a list of file ranges asynchronously from this file.
* The default iterates through the ranges to read each synchronously, but
* the intent is that FSDataInputStream subclasses can make more efficient
* readers.
* As a result of the call, each range will have FileRange.setData(CompletableFuture)
* called with a future that when complete will have a ByteBuffer with the
* data from the file's range.
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @throws IOException any IOE.
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(),
maxReadSizeForVectorReads());
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,6 +20,7 @@
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import java.io.BufferedOutputStream;
import java.io.DataOutput;
@ -33,8 +34,11 @@
import java.io.FileDescriptor;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime;
@ -44,6 +48,9 @@
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -130,7 +137,9 @@ public void initialize(URI uri, Configuration conf) throws IOException {
class LocalFSFileInputStream extends FSInputStream implements
HasFileDescriptor, IOStatisticsSource, StreamCapabilities {
private FileInputStream fis;
private final File name;
private long position;
private AsynchronousFileChannel asyncChannel = null;
/**
* Minimal set of counters.
@ -148,7 +157,8 @@ class LocalFSFileInputStream extends FSInputStream implements
private final AtomicLong bytesRead;
public LocalFSFileInputStream(Path f) throws IOException {
fis = new FileInputStream(pathToFile(f));
name = pathToFile(f);
fis = new FileInputStream(name);
bytesRead = ioStatistics.getCounterReference(
STREAM_READ_BYTES);
}
@ -179,10 +189,16 @@ public boolean seekToNewSource(long targetPos) throws IOException {
@Override
public int available() throws IOException { return fis.available(); }
@Override
public void close() throws IOException { fis.close(); }
@Override
public boolean markSupported() { return false; }
@Override
public void close() throws IOException {
fis.close();
if (asyncChannel != null) {
asyncChannel.close();
}
}
@Override
public int read() throws IOException {
try {
@ -272,8 +288,88 @@ public boolean hasCapability(String capability) {
public IOStatistics getIOStatistics() {
return ioStatistics;
}
AsynchronousFileChannel getAsyncChannel() throws IOException {
if (asyncChannel == null) {
synchronized (this) {
asyncChannel = AsynchronousFileChannel.open(name.toPath(),
StandardOpenOption.READ);
}
}
return asyncChannel;
}
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: ranges) {
VectoredReadUtils.validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[ranges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers);
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler);
}
} catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe);
for(FileRange range: ranges) {
range.getData().completeExceptionally(ioe);
}
}
}
}
/**
* A CompletionHandler that implements readFully and translates back
* into the form of CompletionHandler that our users expect.
*/
static class AsyncHandler implements CompletionHandler<Integer, Integer> {
private final AsynchronousFileChannel channel;
private final List<? extends FileRange> ranges;
private final ByteBuffer[] buffers;
AsyncHandler(AsynchronousFileChannel channel,
List<? extends FileRange> ranges,
ByteBuffer[] buffers) {
this.channel = channel;
this.ranges = ranges;
this.buffers = buffers;
}
@Override
public void completed(Integer result, Integer r) {
FileRange range = ranges.get(r);
ByteBuffer buffer = buffers[r];
if (result == -1) {
failed(new EOFException("Read past End of File"), r);
} else {
if (buffer.remaining() > 0) {
// issue a read for the rest of the buffer
// QQ: What if this fails? It has the same handler.
channel.read(buffer, range.getOffset() + buffer.position(), r, this);
} else {
// QQ: Why is this required? I think because we don't want the
// user to read data beyond limit.
buffer.flip();
range.getData().complete(buffer);
}
}
}
@Override
public void failed(Throwable exc, Integer r) {
LOG.debug("Failed while reading range {} ", r, exc);
ranges.get(r).getData().completeExceptionally(exc);
}
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
getFileStatus(f);

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import java.util.ArrayList;
import java.util.List;
/**
* A file range that represents a set of underlying file ranges.
* This is used when we combine the user's FileRange objects
* together into a single read for efficiency.
*/
public class CombinedFileRange extends FileRangeImpl {
private ArrayList<FileRange> underlying = new ArrayList<>();
public CombinedFileRange(long offset, long end, FileRange original) {
super(offset, (int) (end - offset));
this.underlying.add(original);
}
/**
* Get the list of ranges that were merged together to form this one.
* @return the list of input ranges
*/
public List<FileRange> getUnderlying() {
return underlying;
}
/**
* Merge this input range into the current one, if it is compatible.
* It is assumed that otherOffset is greater or equal the current offset,
* which typically happens by sorting the input ranges on offset.
* @param otherOffset the offset to consider merging
* @param otherEnd the end to consider merging
* @param other the underlying FileRange to add if we merge
* @param minSeek the minimum distance that we'll seek without merging the
* ranges together
* @param maxSize the maximum size that we'll merge into a single range
* @return true if we have merged the range into this one
*/
public boolean merge(long otherOffset, long otherEnd, FileRange other,
int minSeek, int maxSize) {
long end = this.getOffset() + this.getLength();
long newEnd = Math.max(end, otherEnd);
if (otherOffset - end >= minSeek || newEnd - this.getOffset() > maxSize) {
return false;
}
this.setLength((int) (newEnd - this.getOffset()));
underlying.add(other);
return true;
}
}

View File

@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.util.Preconditions;
/**
* Utility class which implements helper methods used
* in vectored IO implementation.
*/
public final class VectoredReadUtils {
/**
* Validate a single range.
* @param range file range.
* @throws EOFException any EOF Exception.
*/
public static void validateRangeRequest(FileRange range)
throws EOFException {
Preconditions.checkArgument(range.getLength() >= 0, "length is negative");
if (range.getOffset() < 0) {
throw new EOFException("position is negative");
}
}
/**
* Validate a list of vectored read ranges.
* @param ranges list of ranges.
* @throws EOFException any EOF exception.
*/
public static void validateVectoredReadRanges(List<? extends FileRange> ranges)
throws EOFException {
for (FileRange range : ranges) {
validateRangeRequest(range);
}
}
/**
* Read fully a list of file ranges asynchronously from this file.
* The default iterates through the ranges to read each synchronously, but
* the intent is that subclasses can make more efficient readers.
* The data or exceptions are pushed into {@link FileRange#getData()}.
* @param stream the stream to read the data from
* @param ranges the byte ranges to read
* @param allocate the byte buffer allocation
* @param minimumSeek the minimum number of bytes to seek over
* @param maximumRead the largest number of bytes to combine into a single read
*/
public static void readVectored(PositionedReadable stream,
List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
int minimumSeek,
int maximumRead) {
if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
for(FileRange range: ranges) {
range.setData(readRangeFrom(stream, range, allocate));
}
} else {
for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
maximumRead)) {
CompletableFuture<ByteBuffer> read =
readRangeFrom(stream, range, allocate);
for(FileRange child: range.getUnderlying()) {
child.setData(read.thenApply(
(b) -> sliceTo(b, range.getOffset(), child)));
}
}
}
}
/**
* Synchronously reads a range from the stream dealing with the combinations
* of ByteBuffers buffers and PositionedReadable streams.
* @param stream the stream to read from
* @param range the range to read
* @param allocate the function to allocate ByteBuffers
* @return the CompletableFuture that contains the read data
*/
public static CompletableFuture<ByteBuffer> readRangeFrom(PositionedReadable stream,
FileRange range,
IntFunction<ByteBuffer> allocate) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
try {
ByteBuffer buffer = allocate.apply(range.getLength());
if (stream instanceof ByteBufferPositionedReadable) {
((ByteBufferPositionedReadable) stream).readFully(range.getOffset(),
buffer);
buffer.flip();
} else {
readNonByteBufferPositionedReadable(stream, range, buffer);
}
result.complete(buffer);
} catch (IOException ioe) {
result.completeExceptionally(ioe);
}
return result;
}
private static void readNonByteBufferPositionedReadable(PositionedReadable stream,
FileRange range,
ByteBuffer buffer) throws IOException {
if (buffer.isDirect()) {
buffer.put(readInDirectBuffer(stream, range));
buffer.flip();
} else {
stream.readFully(range.getOffset(), buffer.array(),
buffer.arrayOffset(), range.getLength());
}
}
private static byte[] readInDirectBuffer(PositionedReadable stream,
FileRange range) throws IOException {
// if we need to read data from a direct buffer and the stream doesn't
// support it, we allocate a byte array to use.
byte[] tmp = new byte[range.getLength()];
stream.readFully(range.getOffset(), tmp, 0, tmp.length);
return tmp;
}
/**
* Is the given input list.
* <ul>
* <li>already sorted by offset</li>
* <li>each range is more than minimumSeek apart</li>
* <li>the start and end of each range is a multiple of chunkSize</li>
* </ul>
*
* @param input the list of input ranges.
* @param chunkSize the size of the chunks that the offset and end must align to.
* @param minimumSeek the minimum distance between ranges.
* @return true if we can use the input list as is.
*/
public static boolean isOrderedDisjoint(List<? extends FileRange> input,
int chunkSize,
int minimumSeek) {
long previous = -minimumSeek;
for(FileRange range: input) {
long offset = range.getOffset();
long end = range.getOffset() + range.getLength();
if (offset % chunkSize != 0 ||
end % chunkSize != 0 ||
(offset - previous < minimumSeek)) {
return false;
}
previous = end;
}
return true;
}
/**
* Calculates floor value of offset based on chunk size.
* @param offset file offset.
* @param chunkSize file chunk size.
* @return floor value.
*/
public static long roundDown(long offset, int chunkSize) {
if (chunkSize > 1) {
return offset - (offset % chunkSize);
} else {
return offset;
}
}
/**
* Calculates the ceil value of offset based on chunk size.
* @param offset file offset.
* @param chunkSize file chunk size.
* @return ceil value.
*/
public static long roundUp(long offset, int chunkSize) {
if (chunkSize > 1) {
long next = offset + chunkSize - 1;
return next - (next % chunkSize);
} else {
return offset;
}
}
/**
* Sort and merge ranges to optimize the access from the underlying file
* system.
* The motivations are that:
* <ul>
* <li>Upper layers want to pass down logical file ranges.</li>
* <li>Fewer reads have better performance.</li>
* <li>Applications want callbacks as ranges are read.</li>
* <li>Some file systems want to round ranges to be at checksum boundaries.</li>
* </ul>
*
* @param input the list of input ranges
* @param chunkSize round the start and end points to multiples of chunkSize
* @param minimumSeek the smallest gap that we should seek over in bytes
* @param maxSize the largest combined file range in bytes
* @return the list of sorted CombinedFileRanges that cover the input
*/
public static List<CombinedFileRange> sortAndMergeRanges(List<? extends FileRange> input,
int chunkSize,
int minimumSeek,
int maxSize) {
// sort the ranges by offset
FileRange[] ranges = input.toArray(new FileRange[0]);
Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
CombinedFileRange current = null;
List<CombinedFileRange> result = new ArrayList<>(ranges.length);
// now merge together the ones that merge
for(FileRange range: ranges) {
long start = roundDown(range.getOffset(), chunkSize);
long end = roundUp(range.getOffset() + range.getLength(), chunkSize);
if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) {
current = new CombinedFileRange(start, end, range);
result.add(current);
}
}
return result;
}
/**
* Slice the data that was read to the user's request.
* This function assumes that the user's request is completely subsumed by the
* read data. This always creates a new buffer pointing to the same underlying
* data but with its own mark and position fields such that reading one buffer
* can't effect other's mark and position.
* @param readData the buffer with the readData
* @param readOffset the offset in the file for the readData
* @param request the user's request
* @return the readData buffer that is sliced to the user's request
*/
public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
FileRange request) {
int offsetChange = (int) (request.getOffset() - readOffset);
int requestLength = request.getLength();
readData = readData.slice();
readData.position(offsetChange);
readData.limit(offsetChange + requestLength);
return readData;
}
/**
* private constructor.
*/
private VectoredReadUtils() {
throw new UnsupportedOperationException();
}
}

View File

@ -443,6 +443,37 @@ The semantics of this are exactly equivalent to
That is, the buffer is filled entirely with the contents of the input source
from position `position`
### `default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)`
Read fully data for a list of ranges asynchronously. The default implementation
iterates through the ranges, tries to coalesce the ranges based on values of
`minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged
ranges synchronously, but the intent is sub classes can implement efficient
implementation.
#### Preconditions
For each requested range:
range.getOffset >= 0 else raise IllegalArgumentException
range.getLength >= 0 else raise EOFException
#### Postconditions
For each requested range:
range.getData() returns CompletableFuture<ByteBuffer> which will have data
from range.getOffset to range.getLength.
### `minSeekForVectorReads()`
Smallest reasonable seek. Two ranges won't be merged together if the difference between
end of first and start of next range is more than this value.
### `maxReadSizeForVectorReads()`
Maximum number of bytes which can be read in one go after merging the ranges.
Two ranges won't be merged if the combined data to be read is more than this value.
## Consistency

View File

@ -0,0 +1,375 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.IntFunction;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
@RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
public static final int DATASET_LEN = 64 * 1024;
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
private final IntFunction<ByteBuffer> allocate;
private final String bufferType;
@Parameterized.Parameters(name = "Buffer type : {0}")
public static List<String> params() {
return Arrays.asList("direct", "array");
}
public AbstractContractVectoredReadTest(String bufferType) {
this.bufferType = bufferType;
this.allocate = "array".equals(bufferType) ?
ByteBuffer::allocate : ByteBuffer::allocateDirect;
}
@Override
public void setup() throws Exception {
super.setup();
Path path = path(VECTORED_READ_FILE_NAME);
FileSystem fs = getFileSystem();
createFile(fs, path, true, DATASET);
Path bigFile = path(VECTORED_READ_FILE_1MB_NAME);
createFile(fs, bigFile, true, DATASET_MB);
}
@Test
public void testVectoredReadMultipleRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
for (int i = 0; i < 10; i++) {
FileRange fileRange = new FileRangeImpl(i * 100, 100);
fileRanges.add(fileRange);
}
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
int i = 0;
for (FileRange res : fileRanges) {
completableFutures[i++] = res.getData();
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
combinedFuture.get();
validateVectoredReadResult(fileRanges);
}
}
@Test
public void testVectoredReadAndReadFully() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(100, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
byte[] readFullRes = new byte[100];
in.readFully(100, readFullRes);
ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
Assertions.assertThat(vecRes)
.describedAs("Result from vectored read and readFully must match")
.isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
}
}
/**
* As the minimum seek value is 4*1024,none of the below ranges
* will get merged.
*/
@Test
public void testDisjointRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 100));
fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100));
fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
/**
* As the minimum seek value is 4*1024, all the below ranges
* will get merged into one.
*/
@Test
public void testAllRangesMergedIntoOne() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 100));
fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100));
fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
/**
* As the minimum seek value is 4*1024, the first three ranges will be
* merged into and other two will remain as it is.
*/
@Test
public void testSomeRangesMergedSomeUnmerged() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 100));
fileRanges.add(new FileRangeImpl(14*1024, 100));
fileRanges.add(new FileRangeImpl(10*1024, 100));
fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100));
fileRanges.add(new FileRangeImpl(40*1024, 1024));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
public void testSameRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
@Test
public void testVectoredRead1MBFile() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(1293, 25837));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_1MB_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
FileRange resRange = fileRanges.get(0);
assertDatasetEquals((int) resRange.getOffset(), "vecRead",
vecRes, resRange.getLength(), DATASET_MB);
}
}
@Test
public void testOverlappingRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 1000));
fileRanges.add(new FileRangeImpl(90, 900));
fileRanges.add(new FileRangeImpl(50, 900));
fileRanges.add(new FileRangeImpl(10, 980));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
@Test
public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
try {
ByteBuffer buffer = data.get();
// Shouldn't reach here.
Assert.fail("EOFException must be thrown while reading EOF");
} catch (ExecutionException ex) {
// ignore as expected.
} catch (Exception ex) {
LOG.error("Exception while running vectored read ", ex);
Assert.fail("Exception while running vectored read " + ex);
}
}
}
}
@Test
public void testNegativeLengthRange() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, -50));
testExceptionalVectoredRead(fs, fileRanges, "Exception is expected");
}
@Test
public void testNegativeOffsetRange() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(-1, 50));
testExceptionalVectoredRead(fs, fileRanges, "Exception is expected");
}
@Test
public void testNormalReadAfterVectoredRead() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = createSomeOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
// read starting 200 bytes
byte[] res = new byte[200];
in.read(res, 0, 200);
ByteBuffer buffer = ByteBuffer.wrap(res);
assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
Assertions.assertThat(in.getPos())
.describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200);
validateVectoredReadResult(fileRanges);
}
}
@Test
public void testVectoredReadAfterNormalRead() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = createSomeOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
// read starting 200 bytes
byte[] res = new byte[200];
in.read(res, 0, 200);
ByteBuffer buffer = ByteBuffer.wrap(res);
assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
Assertions.assertThat(in.getPos())
.describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200);
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}
@Test
public void testMultipleVectoredReads() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges1 = createSomeOverlappingRanges();
List<FileRange> fileRanges2 = createSomeOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges1, allocate);
in.readVectored(fileRanges2, allocate);
validateVectoredReadResult(fileRanges2);
validateVectoredReadResult(fileRanges1);
}
}
protected List<FileRange> createSomeOverlappingRanges() {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 100));
fileRanges.add(new FileRangeImpl(90, 50));
return fileRanges;
}
protected void validateVectoredReadResult(List<FileRange> fileRanges)
throws ExecutionException, InterruptedException {
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
int i = 0;
for (FileRange res : fileRanges) {
completableFutures[i++] = res.getData();
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
combinedFuture.get();
for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
try {
ByteBuffer buffer = FutureIOSupport.awaitFuture(data);
assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET);
} catch (Exception ex) {
LOG.error("Exception while running vectored read ", ex);
Assert.fail("Exception while running vectored read " + ex);
}
}
}
protected void testExceptionalVectoredRead(FileSystem fs,
List<FileRange> fileRanges,
String s) throws IOException {
boolean exRaised = false;
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
// Can we intercept here as done in S3 tests ??
in.readVectored(fileRanges, allocate);
} catch (EOFException | IllegalArgumentException ex) {
// expected.
exRaised = true;
}
Assertions.assertThat(exRaised)
.describedAs(s)
.isTrue();
}
/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
* @param originalData
*/
private void assertDatasetEquals(
final int readOffset, final String operation,
final ByteBuffer data,
int length, byte[] originalData) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
assertEquals(operation + " with read offset " + readOffset
+ ": data[" + i + "] != DATASET[" + o + "]",
originalData[o], data.get());
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.localfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest {
public TestLocalFSContractVectoredRead(String bufferType) {
super(bufferType);
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.rawlocal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestRawLocalContractVectoredRead extends AbstractContractVectoredReadTest {
public TestRawLocalContractVectoredRead(String bufferType) {
super(bufferType);
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RawlocalFSContract(conf);
}
}

View File

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully;
import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally;
/**
* Test behavior of {@link VectoredReadUtils}.
*/
public class TestVectoredReadUtils extends HadoopTestBase {
@Test
public void testSliceTo() {
final int size = 64 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(size);
// fill the buffer with data
IntBuffer intBuffer = buffer.asIntBuffer();
for(int i=0; i < size / Integer.BYTES; ++i) {
intBuffer.put(i);
}
// ensure we don't make unnecessary slices
ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100,
new FileRangeImpl(100, size));
Assertions.assertThat(buffer)
.describedAs("Slicing on the same offset shouldn't " +
"create a new buffer")
.isEqualTo(slice);
// try slicing a range
final int offset = 100;
final int sliceStart = 1024;
final int sliceLength = 16 * 1024;
slice = VectoredReadUtils.sliceTo(buffer, offset,
new FileRangeImpl(offset + sliceStart, sliceLength));
// make sure they aren't the same, but use the same backing data
Assertions.assertThat(buffer)
.describedAs("Slicing on new offset should " +
"create a new buffer")
.isNotEqualTo(slice);
Assertions.assertThat(buffer.array())
.describedAs("Slicing should use the same underlying " +
"data")
.isEqualTo(slice.array());
// test the contents of the slice
intBuffer = slice.asIntBuffer();
for(int i=0; i < sliceLength / Integer.BYTES; ++i) {
assertEquals("i = " + i, i + sliceStart / Integer.BYTES, intBuffer.get());
}
}
@Test
public void testRounding() {
for(int i=5; i < 10; ++i) {
assertEquals("i = "+ i, 5, VectoredReadUtils.roundDown(i, 5));
assertEquals("i = "+ i, 10, VectoredReadUtils.roundUp(i+1, 5));
}
assertEquals("Error while roundDown", 13, VectoredReadUtils.roundDown(13, 1));
assertEquals("Error while roundUp", 13, VectoredReadUtils.roundUp(13, 1));
}
@Test
public void testMerge() {
FileRange base = new FileRangeImpl(2000, 1000);
CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
// test when the gap between is too big
assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000,
new FileRangeImpl(5000, 1000), 2000, 4000));
assertEquals("Number of ranges in merged range shouldn't increase",
1, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 2000, mergeBase.getOffset());
assertEquals("post merge length", 1000, mergeBase.getLength());
// test when the total size gets exceeded
assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
new FileRangeImpl(5000, 1000), 2001, 3999));
assertEquals("Number of ranges in merged range shouldn't increase",
1, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 2000, mergeBase.getOffset());
assertEquals("post merge length", 1000, mergeBase.getLength());
// test when the merge works
assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
new FileRangeImpl(5000, 1000), 2001, 4000));
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 2000, mergeBase.getOffset());
assertEquals("post merge length", 4000, mergeBase.getLength());
// reset the mergeBase and test with a 10:1 reduction
mergeBase = new CombinedFileRange(200, 300, base);
assertEquals(200, mergeBase.getOffset());
assertEquals(100, mergeBase.getLength());
assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
new FileRangeImpl(5000, 1000), 201, 400));
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 200, mergeBase.getOffset());
assertEquals("post merge length", 400, mergeBase.getLength());
}
@Test
public void testSortAndMerge() {
List<FileRange> input = Arrays.asList(
new FileRangeImpl(3000, 100),
new FileRangeImpl(2100, 100),
new FileRangeImpl(1000, 100)
);
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
input, 100, 1001, 2500);
assertEquals("merged range size", 1, outputList.size());
CombinedFileRange output = outputList.get(0);
assertEquals("merged range underlying size", 3, output.getUnderlying().size());
assertEquals("range[1000,3100)", output.toString());
assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
// the minSeek doesn't allow the first two to merge
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100);
assertEquals("merged range size", 2, outputList.size());
assertEquals("range[1000,1100)", outputList.get(0).toString());
assertEquals("range[2100,3100)", outputList.get(1).toString());
assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
// the maxSize doesn't allow the third range to merge
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099);
assertEquals("merged range size", 2, outputList.size());
assertEquals("range[1000,2200)", outputList.get(0).toString());
assertEquals("range[3000,3100)", outputList.get(1).toString());
assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
// test the round up and round down (the maxSize doesn't allow any merges)
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100);
assertEquals("merged range size", 3, outputList.size());
assertEquals("range[992,1104)", outputList.get(0).toString());
assertEquals("range[2096,2208)", outputList.get(1).toString());
assertEquals("range[2992,3104)", outputList.get(2).toString());
assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
}
@Test
public void testSortAndMergeMoreCases() throws Exception {
List<FileRange> input = Arrays.asList(
new FileRangeImpl(3000, 110),
new FileRangeImpl(3000, 100),
new FileRangeImpl(2100, 100),
new FileRangeImpl(1000, 100)
);
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
input, 1, 1001, 2500);
assertEquals("merged range size", 1, outputList.size());
CombinedFileRange output = outputList.get(0);
assertEquals("merged range underlying size", 4, output.getUnderlying().size());
assertEquals("range[1000,3110)", output.toString());
assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
outputList = VectoredReadUtils.sortAndMergeRanges(
input, 100, 1001, 2500);
assertEquals("merged range size", 1, outputList.size());
output = outputList.get(0);
assertEquals("merged range underlying size", 4, output.getUnderlying().size());
assertEquals("range[1000,3200)", output.toString());
assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
}
interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
// nothing
}
static void fillBuffer(ByteBuffer buffer) {
byte b = 0;
while (buffer.remaining() > 0) {
buffer.put(b++);
}
}
@Test
public void testReadRangeFromByteBufferPositionedReadable() throws Exception {
Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1));
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
CompletableFuture<ByteBuffer> result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
ByteBuffer::allocate);
assertFutureCompletedSuccessfully(result);
ByteBuffer buffer = result.get();
assertEquals("Size of result buffer", 100, buffer.remaining());
byte b = 0;
while (buffer.remaining() > 0) {
assertEquals("remain = " + buffer.remaining(), b++, buffer.get());
}
// test an IOException
Mockito.reset(stream);
Mockito.doThrow(new IOException("foo"))
.when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
ByteBuffer::allocate);
assertFutureFailedExceptionally(result);
}
static void runReadRangeFromPositionedReadable(IntFunction<ByteBuffer> allocate)
throws Exception {
PositionedReadable stream = Mockito.mock(PositionedReadable.class);
Mockito.doAnswer(invocation -> {
byte b=0;
byte[] buffer = invocation.getArgument(1);
for(int i=0; i < buffer.length; ++i) {
buffer[i] = b++;
}
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
ArgumentMatchers.anyInt());
CompletableFuture<ByteBuffer> result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
allocate);
assertFutureCompletedSuccessfully(result);
ByteBuffer buffer = result.get();
assertEquals("Size of result buffer", 100, buffer.remaining());
byte b = 0;
while (buffer.remaining() > 0) {
assertEquals("remain = " + buffer.remaining(), b++, buffer.get());
}
// test an IOException
Mockito.reset(stream);
Mockito.doThrow(new IOException("foo"))
.when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
ArgumentMatchers.anyInt());
result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
ByteBuffer::allocate);
assertFutureFailedExceptionally(result);
}
@Test
public void testReadRangeArray() throws Exception {
runReadRangeFromPositionedReadable(ByteBuffer::allocate);
}
@Test
public void testReadRangeDirect() throws Exception {
runReadRangeFromPositionedReadable(ByteBuffer::allocateDirect);
}
static void validateBuffer(String message, ByteBuffer buffer, int start) {
byte expected = (byte) start;
while (buffer.remaining() > 0) {
assertEquals(message + " remain: " + buffer.remaining(), expected++,
buffer.get());
}
}
@Test
public void testReadVectored() throws Exception {
List<FileRange> input = Arrays.asList(new FileRangeImpl(0, 100),
new FileRangeImpl(100_000, 100),
new FileRangeImpl(200_000, 100));
Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1));
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
// should not merge the ranges
VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100);
Mockito.verify(stream, Mockito.times(3))
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
}
}
@Test
public void testReadVectoredMerge() throws Exception {
List<FileRange> input = Arrays.asList(new FileRangeImpl(2000, 100),
new FileRangeImpl(1000, 100),
new FileRangeImpl(0, 100));
Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1));
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
// should merge the ranges into a single read
VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100);
Mockito.verify(stream, Mockito.times(1))
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000);
}
}
}

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.test;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
/**
@ -28,17 +31,18 @@ public class MoreAsserts {
/**
* Assert equivalence for array and iterable
* @param <T> the type of the elements
* @param s the name/message for the collection
* @param expected the expected array of elements
* @param actual the actual iterable of elements
*
* @param <T> the type of the elements
* @param s the name/message for the collection
* @param expected the expected array of elements
* @param actual the actual iterable of elements
*/
public static <T> void assertEquals(String s, T[] expected,
Iterable<T> actual) {
Iterator<T> it = actual.iterator();
int i = 0;
for (; i < expected.length && it.hasNext(); ++i) {
Assert.assertEquals("Element "+ i +" for "+ s, expected[i], it.next());
Assert.assertEquals("Element " + i + " for " + s, expected[i], it.next());
}
Assert.assertTrue("Expected more elements", i == expected.length);
Assert.assertTrue("Expected less elements", !it.hasNext());
@ -46,7 +50,8 @@ public static <T> void assertEquals(String s, T[] expected,
/**
* Assert equality for two iterables
* @param <T> the type of the elements
*
* @param <T> the type of the elements
* @param s
* @param expected
* @param actual
@ -57,10 +62,28 @@ public static <T> void assertEquals(String s, Iterable<T> expected,
Iterator<T> ita = actual.iterator();
int i = 0;
while (ite.hasNext() && ita.hasNext()) {
Assert.assertEquals("Element "+ i +" for "+s, ite.next(), ita.next());
Assert.assertEquals("Element " + i + " for " + s, ite.next(), ita.next());
}
Assert.assertTrue("Expected more elements", !ite.hasNext());
Assert.assertTrue("Expected less elements", !ita.hasNext());
}
public static <T> void assertFutureCompletedSuccessfully(CompletableFuture<T> future) {
Assertions.assertThat(future.isDone())
.describedAs("This future is supposed to be " +
"completed successfully")
.isTrue();
Assertions.assertThat(future.isCompletedExceptionally())
.describedAs("This future is supposed to be " +
"completed successfully")
.isFalse();
}
public static <T> void assertFutureFailedExceptionally(CompletableFuture<T> future) {
Assertions.assertThat(future.isCompletedExceptionally())
.describedAs("This future is supposed to be " +
"completed exceptionally")
.isTrue();
}
}

View File

@ -56,5 +56,4 @@
</plugin>
</plugins>
</build>
</project>

View File

@ -219,6 +219,7 @@
<nodejs.version>v12.22.1</nodejs.version>
<yarnpkg.version>v1.22.5</yarnpkg.version>
<apache-ant.version>1.10.11</apache-ant.version>
<jmh.version>1.20</jmh.version>
</properties>
<dependencyManagement>
@ -1589,6 +1590,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>

View File

@ -1463,11 +1463,12 @@ private FSDataInputStream executeOpen(
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);
return new FSDataInputStream(
new S3AInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats));
new S3AInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats,
unboundedThreadPool));
}
/**
@ -4926,9 +4927,8 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
/**
* This is a proof of concept of a select API.
* @param source path to source data
* @param expression select expression
* @param options request configuration from the builder.
* @param providedStatus any passed in status
* @param fileInformation any passed in information.
* @return the stream of the results
* @throws IOException IO failure
*/

View File

@ -19,38 +19,49 @@
package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.IntFunction;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint;
import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo;
import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
@ -88,6 +99,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;
/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
* operation. This is to ensure that when a big range of data is
* requested in direct byte buffer doesn't leads to OOM errors.
*/
private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
/**
* This is the public position; the one set in {@link #seek(long)}
@ -111,6 +129,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
private S3ObjectInputStream wrappedStream;
private final S3AReadOpContext context;
private final InputStreamCallbacks client;
/**
* Thread pool used for vectored IO operation.
*/
private final ThreadPoolExecutor unboundedThreadPool;
private final String bucket;
private final String key;
private final String pathStr;
@ -160,12 +183,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param ctx operation context
* @param s3Attributes object attributes
* @param client S3 client to use
* @param streamStatistics statistics for this stream
* @param unboundedThreadPool thread pool to use.
*/
public S3AInputStream(S3AReadOpContext ctx,
S3ObjectAttributes s3Attributes,
InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
S3ObjectAttributes s3Attributes,
InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics,
ThreadPoolExecutor unboundedThreadPool) {
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
"No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@ -187,6 +211,7 @@ public S3AInputStream(S3AReadOpContext ctx,
setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead());
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
this.unboundedThreadPool = unboundedThreadPool;
}
/**
@ -880,6 +905,231 @@ public void readFully(long position, byte[] buffer, int offset, int length)
}
}
/**
* {@inheritDoc}
* Vectored read implementation for S3AInputStream.
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
checkNotClosed();
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
for(FileRange range: ranges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size());
for(CombinedFileRange combinedFileRange: combinedFileRanges) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
combinedFileRange.setData(result);
unboundedThreadPool.submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
* Read data in the combinedFileRange and update data in buffers
* of all underlying ranges.
* @param combinedFileRange combined range.
* @param buffer combined buffer.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
ByteBuffer buffer) {
// Not putting read single range call inside try block as
// exception if any occurred during this call will be raised
// during awaitFuture call while getting the combined buffer.
readSingleRange(combinedFileRange, buffer);
try {
// In case of single range we return the original byte buffer else
// we return slice byte buffers for each child ranges.
ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData());
if (combinedFileRange.getUnderlying().size() == 1) {
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
} else {
for (FileRange child : combinedFileRange.getUnderlying()) {
updateOriginalRange(child, combinedBuffer, combinedFileRange);
}
}
} catch (Exception ex) {
LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex);
for(FileRange child : combinedFileRange.getUnderlying()) {
child.getData().completeExceptionally(ex);
}
}
}
/**
* Update data in child range from combined range.
* @param child child range.
* @param combinedBuffer combined buffer.
* @param combinedFileRange combined range.
*/
private void updateOriginalRange(FileRange child,
ByteBuffer combinedBuffer,
CombinedFileRange combinedFileRange) {
LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ",
child.getOffset(), child.getLength(),
combinedFileRange.getOffset(), combinedFileRange.getLength());
ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child);
child.getData().complete(childBuffer);
LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ",
child.getOffset(), child.getLength(),
combinedFileRange.getOffset(), combinedFileRange.getLength());
}
/**
* // Check if we can use contentLength returned by http GET request.
* Validates range parameters.
* @param range requested range.
* @throws EOFException end of file exception.
*/
private void validateRangeRequest(FileRange range) throws EOFException {
VectoredReadUtils.validateRangeRequest(range);
if(range.getOffset() + range.getLength() > contentLength) {
LOG.warn("Requested range [{}, {}) is beyond EOF for path {}",
range.getOffset(), range.getLength(), pathStr);
throw new EOFException("Requested range [" + range.getOffset() +", "
+ range.getLength() + ") is beyond EOF for path " + pathStr);
}
}
/**
* TODO: Add retry in client.getObject(). not present in older reads why here??
* Okay retry is being done in the top layer during read.
* But if we do here in the top layer, one issue I am thinking is
* what if there is some error which happened during filling the buffer
* If we retry that old offsets of heap buffers can be overwritten ?
* I think retry should be only added in {@link S3AInputStream#getS3Object}
* Read data from S3 for this range and populate the bufffer.
* @param range range of data to read.
* @param buffer buffer to fill.
*/
private void readSingleRange(FileRange range, ByteBuffer buffer) {
LOG.debug("Start reading range {} from path {} ", range, pathStr);
S3Object objectRange = null;
S3ObjectInputStream objectContent = null;
try {
long position = range.getOffset();
int length = range.getLength();
final String operationName = "readRange";
objectRange = getS3Object(operationName, position, length);
objectContent = objectRange.getObjectContent();
if (objectContent == null) {
throw new PathIOException(uri,
"Null IO stream received during " + operationName);
}
populateBuffer(length, buffer, objectContent);
range.getData().complete(buffer);
} catch (Exception ex) {
LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex);
range.getData().completeExceptionally(ex);
} finally {
IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
}
LOG.debug("Finished reading range {} from path {} ", range, pathStr);
}
/**
* Populates the buffer with data from objectContent
* till length. Handles both direct and heap byte buffers.
* @param length length of data to populate.
* @param buffer buffer to fill.
* @param objectContent result retrieved from S3 store.
* @throws IOException any IOE.
*/
private void populateBuffer(int length,
ByteBuffer buffer,
S3ObjectInputStream objectContent) throws IOException {
if (buffer.isDirect()) {
int readBytes = 0;
int offset = 0;
byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
while (readBytes < length) {
int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
TMP_BUFFER_MAX_SIZE
: length - readBytes;
readByteArray(objectContent, tmp, 0, currentLength);
buffer.put(tmp, 0, currentLength);
offset = offset + currentLength;
readBytes = readBytes + currentLength;
}
buffer.flip();
} else {
readByteArray(objectContent, buffer.array(), 0, length);
}
}
public void readByteArray(S3ObjectInputStream objectContent,
byte[] dest,
int offset,
int length) throws IOException {
int readBytes = 0;
while (readBytes < length) {
int readBytesCurr = objectContent.read(dest,
offset + readBytes,
length - readBytes);
readBytes +=readBytesCurr;
if (readBytesCurr < 0) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
}
}
/**
* Read data from S3 using a http request.
* This also handles if file has been changed while http call
* is getting executed. If file has been changed RemoteFileChangedException
* is thrown.
* @param operationName name of the operation for which get object on S3 is called.
* @param position position of the object to be read from S3.
* @param length length from position of the object to be read from S3.
* @return S3Object
* @throws IOException exception if any.
*/
private S3Object getS3Object(String operationName, long position,
int length) throws IOException {
final GetObjectRequest request = client.newGetRequest(key)
.withRange(position, position + length - 1);
changeTracker.maybeApplyConstraint(request);
DurationTracker tracker = streamStatistics.initiateGetRequest();
S3Object objectRange;
Invoker invoker = context.getReadInvoker();
try {
objectRange = invoker.retry(operationName, pathStr, true,
() -> client.getObject(request));
} catch (IOException ex) {
tracker.failed();
throw ex;
} finally {
tracker.close();
}
changeTracker.processResponse(objectRange, operationName,
position);
return objectRange;
}
/**
* Access the input stream statistics.
* This is for internal testing and may be removed without warning.

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import java.util.ArrayList;
import java.util.List;
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
public ITestS3AContractVectoredRead(String bufferType) {
super(bufferType);
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
/**
* Overriding in S3 vectored read api fails fast in case of EOF
* requested range.
* @throws Exception
*/
@Override
public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
}
}

View File

@ -111,7 +111,8 @@ private S3AInputStream getMockedS3AInputStream() {
s3AReadOpContext,
s3ObjectAttributes,
getMockedInputStreamCallback(),
s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics());
s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(),
null);
}
/**

View File

@ -52,7 +52,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN
# for debugging low level S3a operations, uncomment these lines
# Log all S3A classes
#log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
#log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO
#log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO

View File

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.4.0-SNAPSHOT</version>
<relativePath>../../hadoop-project/pom.xml</relativePath>
</parent>
<artifactId>hadoop-benchmark</artifactId>
<version>3.4.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Apache Hadoop Common Benchmark</name>
<description>Apache Hadoop Common Benchmark</description>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.benchmark.VectoredReadBenchmark</mainClass>
</manifest>
</archive>
<descriptors>
<descriptor>src/main/assembly/uber.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/src/main/findbugs/exclude.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/src/main/findbugs/exclude.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,33 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly>
<id>uber</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
<containerDescriptorHandlers>
<containerDescriptorHandler>
<handlerName>metaInf-services</handlerName>
</containerDescriptorHandler>
</containerDescriptorHandlers>
</assembly>

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
<Match>
<Class name="~org\.apache\.hadoop\.benchmark\.generated.*"/>
</Match>
<Match>
<Class name="~org\.openjdk\.jmh\.infra\.generated.*"/>
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.benchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.FileSystems;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class VectoredReadBenchmark {
static final Path DATA_PATH = getTestDataPath();
static final String DATA_PATH_PROPERTY = "bench.data";
static final int READ_SIZE = 64 * 1024;
static final long SEEK_SIZE = 1024L * 1024;
static Path getTestDataPath() {
String value = System.getProperty(DATA_PATH_PROPERTY);
return new Path(value == null ? "/tmp/taxi.orc" : value);
}
@State(Scope.Thread)
public static class FileSystemChoice {
@Param({"local", "raw"})
private String fileSystemKind;
private Configuration conf;
private FileSystem fs;
@Setup(Level.Trial)
public void setup() {
conf = new Configuration();
try {
LocalFileSystem local = FileSystem.getLocal(conf);
fs = "raw".equals(fileSystemKind) ? local.getRaw() : local;
} catch (IOException e) {
throw new IllegalArgumentException("Can't get filesystem", e);
}
}
}
@State(Scope.Thread)
public static class BufferChoice {
@Param({"direct", "array"})
private String bufferKind;
private IntFunction<ByteBuffer> allocate;
@Setup(Level.Trial)
public void setup() {
allocate = "array".equals(bufferKind)
? ByteBuffer::allocate : ByteBuffer::allocateDirect;
}
}
@Benchmark
public void asyncRead(FileSystemChoice fsChoice,
BufferChoice bufferChoice,
Blackhole blackhole) throws Exception {
FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
List<FileRange> ranges = new ArrayList<>();
for(int m=0; m < 100; ++m) {
FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE);
ranges.add(range);
}
stream.readVectored(ranges, bufferChoice.allocate);
for(FileRange range: ranges) {
blackhole.consume(range.getData().get());
}
stream.close();
}
static class Joiner implements CompletionHandler<ByteBuffer, FileRange> {
private int remaining;
private final ByteBuffer[] result;
private Throwable exception = null;
Joiner(int total) {
remaining = total;
result = new ByteBuffer[total];
}
synchronized void finish() {
remaining -= 1;
if (remaining == 0) {
notify();
}
}
synchronized ByteBuffer[] join() throws InterruptedException, IOException {
while (remaining > 0 && exception == null) {
wait();
}
if (exception != null) {
throw new IOException("problem reading", exception);
}
return result;
}
@Override
public synchronized void completed(ByteBuffer buffer, FileRange attachment) {
result[--remaining] = buffer;
if (remaining == 0) {
notify();
}
}
@Override
public synchronized void failed(Throwable exc, FileRange attachment) {
this.exception = exc;
notify();
}
}
static class FileRangeCallback extends FileRangeImpl implements
CompletionHandler<Integer, FileRangeCallback> {
private final AsynchronousFileChannel channel;
private final ByteBuffer buffer;
private int completed = 0;
private final Joiner joiner;
FileRangeCallback(AsynchronousFileChannel channel, long offset,
int length, Joiner joiner, ByteBuffer buffer) {
super(offset, length);
this.channel = channel;
this.joiner = joiner;
this.buffer = buffer;
}
@Override
public void completed(Integer result, FileRangeCallback attachment) {
final int bytes = result;
if (bytes == -1) {
failed(new EOFException("Read past end of file"), this);
}
completed += bytes;
if (completed < this.getLength()) {
channel.read(buffer, this.getOffset() + completed, this, this);
} else {
buffer.flip();
joiner.finish();
}
}
@Override
public void failed(Throwable exc, FileRangeCallback attachment) {
joiner.failed(exc, this);
}
}
@Benchmark
public void asyncFileChanArray(BufferChoice bufferChoice,
Blackhole blackhole) throws Exception {
java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString());
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
List<FileRangeImpl> ranges = new ArrayList<>();
Joiner joiner = new Joiner(100);
for(int m=0; m < 100; ++m) {
ByteBuffer buffer = bufferChoice.allocate.apply(READ_SIZE);
FileRangeCallback range = new FileRangeCallback(channel, m * SEEK_SIZE,
READ_SIZE, joiner, buffer);
ranges.add(range);
channel.read(buffer, range.getOffset(), range, range);
}
joiner.join();
channel.close();
blackhole.consume(ranges);
}
@Benchmark
public void syncRead(FileSystemChoice fsChoice,
Blackhole blackhole) throws Exception {
FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
List<byte[]> result = new ArrayList<>();
for(int m=0; m < 100; ++m) {
byte[] buffer = new byte[READ_SIZE];
stream.readFully(m * SEEK_SIZE, buffer);
result.add(buffer);
}
blackhole.consume(result);
stream.close();
}
/**
* Run the benchmarks.
* @param args the pathname of a 100MB data file
* @throws Exception any ex.
*/
public static void main(String[] args) throws Exception {
OptionsBuilder opts = new OptionsBuilder();
opts.include("VectoredReadBenchmark");
opts.jvmArgs("-server", "-Xms256m", "-Xmx2g",
"-D" + DATA_PATH_PROPERTY + "=" + args[0]);
opts.forks(1);
new Runner(opts.build()).run();
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Benchmark for Vectored Read IO operations.
*/
package org.apache.hadoop.benchmark;

View File

@ -51,6 +51,7 @@
<module>hadoop-azure-datalake</module>
<module>hadoop-aliyun</module>
<module>hadoop-fs2img</module>
<module>hadoop-benchmark</module>
</modules>
<build>

View File

@ -550,6 +550,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x
<exclude>licenses-binary/**</exclude>
<exclude>dev-support/docker/pkg-resolver/packages.json</exclude>
<exclude>dev-support/docker/pkg-resolver/platforms.json</exclude>
<exclude>**/target/**</exclude>
</excludes>
</configuration>
</plugin>