diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 9e1551e8a9..50cab7dc4c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.Function4RaisingIOE; /** * Utility class which implements helper methods used @@ -37,6 +38,8 @@ */ public final class VectoredReadUtils { + private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; + /** * Validate a single range. * @param range file range. @@ -114,7 +117,12 @@ private static void readNonByteBufferPositionedReadable(PositionedReadable strea FileRange range, ByteBuffer buffer) throws IOException { if (buffer.isDirect()) { - buffer.put(readInDirectBuffer(stream, range)); + readInDirectBuffer(range.getLength(), + buffer, + (position, buffer1, offset, length) -> { + stream.readFully(position, buffer1, offset, length); + return null; + }); buffer.flip(); } else { stream.readFully(range.getOffset(), buffer.array(), @@ -122,13 +130,34 @@ private static void readNonByteBufferPositionedReadable(PositionedReadable strea } } - private static byte[] readInDirectBuffer(PositionedReadable stream, - FileRange range) throws IOException { - // if we need to read data from a direct buffer and the stream doesn't - // support it, we allocate a byte array to use. - byte[] tmp = new byte[range.getLength()]; - stream.readFully(range.getOffset(), tmp, 0, tmp.length); - return tmp; + /** + * Read bytes from stream into a byte buffer using an + * intermediate byte array. + * @param length number of bytes to read. + * @param buffer buffer to fill. + * @param operation operation to use for reading data. + * @throws IOException any IOE. + */ + public static void readInDirectBuffer(int length, + ByteBuffer buffer, + Function4RaisingIOE operation) throws IOException { + if (length == 0) { + return; + } + int readBytes = 0; + int position = 0; + int tmpBufferMaxSize = Math.min(TMP_BUFFER_MAX_SIZE, length); + byte[] tmp = new byte[tmpBufferMaxSize]; + while (readBytes < length) { + int currentLength = (readBytes + tmpBufferMaxSize) < length ? + tmpBufferMaxSize + : (length - readBytes); + operation.apply(position, tmp, 0, currentLength); + buffer.put(tmp, 0, currentLength); + position = position + currentLength; + readBytes = readBytes + currentLength; + } } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Function4RaisingIOE.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Function4RaisingIOE.java new file mode 100644 index 0000000000..f0cd5c08c5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Function4RaisingIOE.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.functional; + +import java.io.IOException; + +/** + * Function of arity 4 which may raise an IOException. + * @param type of arg1. + * @param type of arg2. + * @param type of arg3. + * @param type of arg4. + * @param return type. + */ +public interface Function4RaisingIOE { + + /** + * Apply the function. + * @param i1 argument 1. + * @param i2 argument 2. + * @param i3 argument 3. + * @param i4 argument 4. + * @return return value. + * @throws IOException any IOE. + */ + R apply(I1 i1, I2 i2, I3 i3, I4 i4) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java index f473036d7e..ebf0e14053 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -386,17 +386,31 @@ public void testReadVectored() throws Exception { List input = Arrays.asList(FileRange.createFileRange(0, 100), FileRange.createFileRange(100_000, 100), FileRange.createFileRange(200_000, 100)); + runAndValidateVectoredRead(input); + } + + @Test + public void testReadVectoredZeroBytes() throws Exception { + List input = Arrays.asList(FileRange.createFileRange(0, 0), + FileRange.createFileRange(100_000, 100), + FileRange.createFileRange(200_000, 0)); + runAndValidateVectoredRead(input); + } + + + private void runAndValidateVectoredRead(List input) + throws Exception { Stream stream = Mockito.mock(Stream.class); Mockito.doAnswer(invocation -> { fillBuffer(invocation.getArgument(1)); return null; }).when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(ByteBuffer.class)); + ArgumentMatchers.any(ByteBuffer.class)); // should not merge the ranges VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); Mockito.verify(stream, Mockito.times(3)) - .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); - for(int b=0; b < input.size(); ++b) { + .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); + for (int b = 0; b < input.size(); ++b) { validateBuffer("buffer " + b, input.get(b).getData().get(), 0); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 7604178b2f..7ef1e216cf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1054,20 +1054,13 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { private void populateBuffer(int length, ByteBuffer buffer, S3ObjectInputStream objectContent) throws IOException { + if (buffer.isDirect()) { - int readBytes = 0; - int offset = 0; - byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE]; - while (readBytes < length) { - checkIfVectoredIOStopped(); - int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ? - TMP_BUFFER_MAX_SIZE - : length - readBytes; - readByteArray(objectContent, tmp, 0, currentLength); - buffer.put(tmp, 0, currentLength); - offset = offset + currentLength; - readBytes = readBytes + currentLength; - } + VectoredReadUtils.readInDirectBuffer(length, buffer, + (position, tmp, offset, currentLength) -> { + readByteArray(objectContent, tmp, offset, currentLength); + return null; + }); buffer.flip(); } else { readByteArray(objectContent, buffer.array(), 0, length); @@ -1076,6 +1069,7 @@ private void populateBuffer(int length, incrementBytesRead(length); } + /** * Read data into destination buffer from s3 object content. * @param objectContent result from S3.