From f5a4b43a4944e1fcbdc86fc91c3a5179218cefdc Mon Sep 17 00:00:00 2001 From: Sahil Takiar Date: Wed, 6 Mar 2019 14:57:46 -0800 Subject: [PATCH] HDFS-14111. hdfsOpenFile on HDFS causes unnecessary IO from file offset 0. Contributed by Sahil Takiar. Signed-off-by: Wei-Chiu Chuang --- .../hadoop/crypto/CryptoInputStream.java | 1 + .../fs/ByteBufferPositionedReadable.java | 64 +++++ .../apache/hadoop/fs/StreamCapabilities.java | 6 + .../apache/hadoop/hdfs/DFSInputStream.java | 1 + .../src/main/native/libhdfs/hdfs.c | 29 +- .../native/libhdfspp/tests/hdfs_ext_test.cc | 5 +- .../fsdataset/impl/AddBlockPoolException.java | 27 ++ .../hadoop/hdfs/TestByteBufferPread.java | 269 ++++++++++++++++++ 8 files changed, 390 insertions(+), 12 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 5c879ecf41..67e8690456 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -740,6 +740,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: return true; default: return false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java new file mode 100644 index 0000000000..873a521d00 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Implementers of this interface provide a positioned read API that writes to a + * {@link ByteBuffer} rather than a {@code byte[]}. + * + * @see PositionedReadable + * @see ByteBufferReadable + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ByteBufferPositionedReadable { + /** + * Reads up to {@code buf.remaining()} bytes into buf from a given position + * in the file and returns the number of bytes read. Callers should use + * {@code buf.limit(...)} to control the size of the desired read and + * {@code buf.position(...)} to control the offset into the buffer the data + * should be written to. + *

+ * After a successful call, {@code buf.position()} will be advanced by the + * number of bytes read and {@code buf.limit()} should be unchanged. + *

+ * In the case of an exception, the values of {@code buf.position()} and + * {@code buf.limit()} are undefined, and callers should be prepared to + * recover from this eventuality. + *

+ * Many implementations will throw {@link UnsupportedOperationException}, so + * callers that are not confident in support for this method from the + * underlying filesystem should be prepared to handle that exception. + *

+ * Implementations should treat 0-length requests as legitimate, and must not + * signal an error upon their receipt. + * + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @return the number of bytes read, possibly zero, or -1 if reached + * end-of-stream + * @throws IOException if there is some error performing the read + */ + int read(long position, ByteBuffer buf) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 3549cdc4fa..c52d30762f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -59,6 +59,12 @@ public interface StreamCapabilities { */ String UNBUFFER = "in:unbuffer"; + /** + * Stream read(ByteBuffer) capability implemented by + * {@link ByteBufferReadable#read(java.nio.ByteBuffer)}. + */ + String READBYTEBUFFER = "in:readbytebuffer"; + /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index f47b88cb5a..a3e2ad5afc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1779,6 +1779,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: return true; default: return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c index 0cced979ad..41caffd290 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c @@ -1013,7 +1013,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, return f{is|os}; */ int accmode = flags & O_ACCMODE; - jstring jStrBufferSize = NULL, jStrReplication = NULL; + jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL; jobject jConfiguration = NULL, jPath = NULL, jFile = NULL; jobject jFS = (jobject)fs; jthrowable jthr; @@ -1171,16 +1171,22 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, file->flags = 0; if ((flags & O_WRONLY) == 0) { - // Try a test read to see if we can do direct reads - char buf; - if (readDirect(fs, file, &buf, 0) == 0) { - // Success - 0-byte read should return 0 + // Check the StreamCapabilities of jFile to see if we can do direct reads + jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsOpenFile(%s): newJavaStr", path); + goto done; + } + jthr = invokeMethod(env, &jVal, INSTANCE, jFile, HADOOP_ISTRM, + "hasCapability", "(Ljava/lang/String;)Z", jCapabilityString); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsOpenFile(%s): FSDataInputStream#hasCapability", path); + goto done; + } + if (jVal.z) { file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; - } else if (errno != ENOTSUP) { - // Unexpected error. Clear it, don't set the direct flag. - fprintf(stderr, - "hdfsOpenFile(%s): WARN: Unexpected error %d when testing " - "for direct read compatibility\n", path, errno); } } ret = 0; @@ -1190,7 +1196,8 @@ done: destroyLocalReference(env, jStrReplication); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); - destroyLocalReference(env, jFile); + destroyLocalReference(env, jFile); + destroyLocalReference(env, jCapabilityString); if (ret) { if (file) { if (file->file) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc index 19d95b47e6..79771f0d7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc @@ -503,7 +503,10 @@ TEST_F(HdfsExtTest, TestReadStats) { hdfsFileFreeReadStatistics(stats); EXPECT_EQ(0, hdfsCloseFile(fs, file)); - EXPECT_EQ(0, errno); + // Since libhdfs is not guaranteed to set errno to 0 on successful + // operations, we disable this check for now, see HDFS-14325 for a + // long term solution to this problem + // EXPECT_EQ(0, errno); } //Testing working directory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java new file mode 100644 index 0000000000..1d2bca61b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; + +/** + * This exception collects all IOExceptions thrown when adding block pools and + * scanning volumes. It keeps the information about which volume is associated + * with an exception. + * + */ +public class AddBlockPoolException extends IOException { + private Map unhealthyDataDirs; + public AddBlockPoolException(Map + unhealthyDataDirs) { + this.unhealthyDataDirs = unhealthyDataDirs; + } + + public Map getFailingVolumes() { + return unhealthyDataDirs; + } + @Override + public String toString() { + return getClass().getName() + ": " + unhealthyDataDirs.toString(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java new file mode 100644 index 0000000000..64f2d06ab1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the DFS positional read functionality on a single node + * mini-cluster. These tests are inspired from {@link TestPread}. The tests + * are much less comprehensive than other pread tests because pread already + * internally uses {@link ByteBuffer}s. + */ +public class TestByteBufferPread { + + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static byte[] fileContents; + private static Path testFile; + private static Random rand; + + private static final long SEED = 0xDEADBEEFL; + private static final int BLOCK_SIZE = 4096; + private static final int FILE_SIZE = 12 * BLOCK_SIZE; + + @BeforeClass + public static void setup() throws IOException { + // Setup the cluster with a small block size so we can create small files + // that span multiple blocks + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + fs = cluster.getFileSystem(); + + // Create a test file that spans 12 blocks, and contains a bunch of random + // bytes + fileContents = new byte[FILE_SIZE]; + rand = new Random(SEED); + rand.nextBytes(fileContents); + testFile = new Path("/byte-buffer-pread-test.dat"); + try (FSDataOutputStream out = fs.create(testFile, (short) 3)) { + out.write(fileContents); + } + } + + /** + * Test preads with {@link java.nio.HeapByteBuffer}s. + */ + @Test + public void testPreadWithHeapByteBuffer() throws IOException { + testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + } + + /** + * Test preads with {@link java.nio.DirectByteBuffer}s. + */ + @Test + public void testPreadWithDirectByteBuffer() throws IOException { + testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + } + + /** + * Reads the entire testFile using the pread API and validates that its + * contents are properly loaded into the supplied {@link ByteBuffer}. + */ + private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + // Make sure the contents of the read buffer equal the contents of the + // file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, fileContents); + buffer.position(buffer.limit()); + } + } + + /** + * Attempts to read the testFile into a {@link ByteBuffer} that is already + * full, and validates that doing so does not change the contents of the + * supplied {@link ByteBuffer}. + */ + private void testPreadWithFullByteBuffer(ByteBuffer buffer) + throws IOException { + // Load some dummy data into the buffer + byte[] existingBufferBytes = new byte[FILE_SIZE]; + rand.nextBytes(existingBufferBytes); + buffer.put(existingBufferBytes); + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + + try (FSDataInputStream in = fs.open(testFile)) { + // Attempt to read into the buffer, 0 bytes should be read since the + // buffer is full + assertEquals(0, in.read(buffer)); + + // Double check the buffer is still full and its contents have not + // changed + assertFalse(buffer.hasRemaining()); + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, existingBufferBytes); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by setting a + * {@link ByteBuffer#limit} on the buffer. Validates that only half of the + * testFile is loaded into the buffer. + */ + private void testPreadWithLimitedByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + // Set the buffer limit to half the size of the file + buffer.limit(FILE_SIZE / 2); + + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Since we set the buffer limit to half the size of the file, we should + // have only read half of the file into the buffer + assertEquals(totalBytesRead, FILE_SIZE / 2); + // Check that the buffer is full and the contents equal the first half of + // the file + assertFalse(buffer.hasRemaining()); + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by setting the + * {@link ByteBuffer#position} the half the size of the file. Validates that + * only half of the testFile is loaded into the buffer. + */ + private void testPreadWithPositionedByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + // Set the buffer position to half the size of the file + buffer.position(FILE_SIZE / 2); + + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position()); + } + + // Since we set the buffer position to half the size of the file, we + // should have only read half of the file into the buffer + assertEquals(totalBytesRead, FILE_SIZE / 2); + // Check that the buffer is full and the contents equal the first half of + // the file + assertFalse(buffer.hasRemaining()); + buffer.position(FILE_SIZE / 2); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by specifying a + * position for the pread API that is half of the file size. Validates that + * only half of the testFile is loaded into the buffer. + */ + private void testPositionedPreadWithByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + + try (FSDataInputStream in = fs.open(testFile)) { + // Start reading from halfway through the file + while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2, + buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Since we starting reading halfway through the file, the buffer should + // only be half full + assertEquals(totalBytesRead, FILE_SIZE / 2); + assertEquals(buffer.position(), FILE_SIZE / 2); + assertTrue(buffer.hasRemaining()); + // Check that the buffer contents equal the second half of the file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE)); + } + } + + @AfterClass + public static void shutdown() throws IOException { + try { + fs.delete(testFile, false); + fs.close(); + } finally { + cluster.shutdown(true); + } + } +}