From 6192c1fe3b4006c1ecb8f8b00cc3b1119b100e6a Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 6 Mar 2019 15:02:18 -0800 Subject: [PATCH] Revert "HDFS-14111. hdfsOpenFile on HDFS causes unnecessary IO from file offset 0. Contributed by Sahil Takiar." This reverts commit f5a4b43a4944e1fcbdc86fc91c3a5179218cefdc. --- .../hadoop/crypto/CryptoInputStream.java | 1 - .../fs/ByteBufferPositionedReadable.java | 64 ----- .../apache/hadoop/fs/StreamCapabilities.java | 6 - .../apache/hadoop/hdfs/DFSInputStream.java | 1 - .../src/main/native/libhdfs/hdfs.c | 29 +- .../native/libhdfspp/tests/hdfs_ext_test.cc | 5 +- .../fsdataset/impl/AddBlockPoolException.java | 27 -- .../hadoop/hdfs/TestByteBufferPread.java | 269 ------------------ 8 files changed, 12 insertions(+), 390 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 67e8690456..5c879ecf41 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -740,7 +740,6 @@ public boolean hasCapability(String capability) { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: - case StreamCapabilities.READBYTEBUFFER: return true; default: return false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java deleted file mode 100644 index 873a521d00..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * Implementers of this interface provide a positioned read API that writes to a - * {@link ByteBuffer} rather than a {@code byte[]}. - * - * @see PositionedReadable - * @see ByteBufferReadable - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface ByteBufferPositionedReadable { - /** - * Reads up to {@code buf.remaining()} bytes into buf from a given position - * in the file and returns the number of bytes read. Callers should use - * {@code buf.limit(...)} to control the size of the desired read and - * {@code buf.position(...)} to control the offset into the buffer the data - * should be written to. - *

- * After a successful call, {@code buf.position()} will be advanced by the - * number of bytes read and {@code buf.limit()} should be unchanged. - *

- * In the case of an exception, the values of {@code buf.position()} and - * {@code buf.limit()} are undefined, and callers should be prepared to - * recover from this eventuality. - *

- * Many implementations will throw {@link UnsupportedOperationException}, so - * callers that are not confident in support for this method from the - * underlying filesystem should be prepared to handle that exception. - *

- * Implementations should treat 0-length requests as legitimate, and must not - * signal an error upon their receipt. - * - * @param position position within file - * @param buf the ByteBuffer to receive the results of the read operation. - * @return the number of bytes read, possibly zero, or -1 if reached - * end-of-stream - * @throws IOException if there is some error performing the read - */ - int read(long position, ByteBuffer buf) throws IOException; -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index c52d30762f..3549cdc4fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -59,12 +59,6 @@ public interface StreamCapabilities { */ String UNBUFFER = "in:unbuffer"; - /** - * Stream read(ByteBuffer) capability implemented by - * {@link ByteBufferReadable#read(java.nio.ByteBuffer)}. - */ - String READBYTEBUFFER = "in:readbytebuffer"; - /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index a3e2ad5afc..f47b88cb5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1779,7 +1779,6 @@ public boolean hasCapability(String capability) { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: - case StreamCapabilities.READBYTEBUFFER: return true; default: return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c index 41caffd290..0cced979ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c @@ -1013,7 +1013,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, return f{is|os}; */ int accmode = flags & O_ACCMODE; - jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL; + jstring jStrBufferSize = NULL, jStrReplication = NULL; jobject jConfiguration = NULL, jPath = NULL, jFile = NULL; jobject jFS = (jobject)fs; jthrowable jthr; @@ -1171,22 +1171,16 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, file->flags = 0; if ((flags & O_WRONLY) == 0) { - // Check the StreamCapabilities of jFile to see if we can do direct reads - jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsOpenFile(%s): newJavaStr", path); - goto done; - } - jthr = invokeMethod(env, &jVal, INSTANCE, jFile, HADOOP_ISTRM, - "hasCapability", "(Ljava/lang/String;)Z", jCapabilityString); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsOpenFile(%s): FSDataInputStream#hasCapability", path); - goto done; - } - if (jVal.z) { + // Try a test read to see if we can do direct reads + char buf; + if (readDirect(fs, file, &buf, 0) == 0) { + // Success - 0-byte read should return 0 file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; + } else if (errno != ENOTSUP) { + // Unexpected error. Clear it, don't set the direct flag. + fprintf(stderr, + "hdfsOpenFile(%s): WARN: Unexpected error %d when testing " + "for direct read compatibility\n", path, errno); } } ret = 0; @@ -1196,8 +1190,7 @@ done: destroyLocalReference(env, jStrReplication); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); - destroyLocalReference(env, jFile); - destroyLocalReference(env, jCapabilityString); + destroyLocalReference(env, jFile); if (ret) { if (file) { if (file->file) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc index 79771f0d7c..19d95b47e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc @@ -503,10 +503,7 @@ TEST_F(HdfsExtTest, TestReadStats) { hdfsFileFreeReadStatistics(stats); EXPECT_EQ(0, hdfsCloseFile(fs, file)); - // Since libhdfs is not guaranteed to set errno to 0 on successful - // operations, we disable this check for now, see HDFS-14325 for a - // long term solution to this problem - // EXPECT_EQ(0, errno); + EXPECT_EQ(0, errno); } //Testing working directory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java deleted file mode 100644 index 1d2bca61b3..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; - -import java.io.IOException; -import java.util.Map; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; - -/** - * This exception collects all IOExceptions thrown when adding block pools and - * scanning volumes. It keeps the information about which volume is associated - * with an exception. - * - */ -public class AddBlockPoolException extends IOException { - private Map unhealthyDataDirs; - public AddBlockPoolException(Map - unhealthyDataDirs) { - this.unhealthyDataDirs = unhealthyDataDirs; - } - - public Map getFailingVolumes() { - return unhealthyDataDirs; - } - @Override - public String toString() { - return getClass().getName() + ": " + unhealthyDataDirs.toString(); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java deleted file mode 100644 index 64f2d06ab1..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Random; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * This class tests the DFS positional read functionality on a single node - * mini-cluster. These tests are inspired from {@link TestPread}. The tests - * are much less comprehensive than other pread tests because pread already - * internally uses {@link ByteBuffer}s. - */ -public class TestByteBufferPread { - - private static MiniDFSCluster cluster; - private static FileSystem fs; - private static byte[] fileContents; - private static Path testFile; - private static Random rand; - - private static final long SEED = 0xDEADBEEFL; - private static final int BLOCK_SIZE = 4096; - private static final int FILE_SIZE = 12 * BLOCK_SIZE; - - @BeforeClass - public static void setup() throws IOException { - // Setup the cluster with a small block size so we can create small files - // that span multiple blocks - Configuration conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); - fs = cluster.getFileSystem(); - - // Create a test file that spans 12 blocks, and contains a bunch of random - // bytes - fileContents = new byte[FILE_SIZE]; - rand = new Random(SEED); - rand.nextBytes(fileContents); - testFile = new Path("/byte-buffer-pread-test.dat"); - try (FSDataOutputStream out = fs.create(testFile, (short) 3)) { - out.write(fileContents); - } - } - - /** - * Test preads with {@link java.nio.HeapByteBuffer}s. - */ - @Test - public void testPreadWithHeapByteBuffer() throws IOException { - testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); - testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE)); - testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); - testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); - testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); - } - - /** - * Test preads with {@link java.nio.DirectByteBuffer}s. - */ - @Test - public void testPreadWithDirectByteBuffer() throws IOException { - testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); - testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); - testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); - testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); - testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); - } - - /** - * Reads the entire testFile using the pread API and validates that its - * contents are properly loaded into the supplied {@link ByteBuffer}. - */ - private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException { - int bytesRead; - int totalBytesRead = 0; - try (FSDataInputStream in = fs.open(testFile)) { - while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { - totalBytesRead += bytesRead; - // Check that each call to read changes the position of the ByteBuffer - // correctly - assertEquals(totalBytesRead, buffer.position()); - } - - // Make sure the buffer is full - assertFalse(buffer.hasRemaining()); - // Make sure the contents of the read buffer equal the contents of the - // file - buffer.position(0); - byte[] bufferContents = new byte[FILE_SIZE]; - buffer.get(bufferContents); - assertArrayEquals(bufferContents, fileContents); - buffer.position(buffer.limit()); - } - } - - /** - * Attempts to read the testFile into a {@link ByteBuffer} that is already - * full, and validates that doing so does not change the contents of the - * supplied {@link ByteBuffer}. - */ - private void testPreadWithFullByteBuffer(ByteBuffer buffer) - throws IOException { - // Load some dummy data into the buffer - byte[] existingBufferBytes = new byte[FILE_SIZE]; - rand.nextBytes(existingBufferBytes); - buffer.put(existingBufferBytes); - // Make sure the buffer is full - assertFalse(buffer.hasRemaining()); - - try (FSDataInputStream in = fs.open(testFile)) { - // Attempt to read into the buffer, 0 bytes should be read since the - // buffer is full - assertEquals(0, in.read(buffer)); - - // Double check the buffer is still full and its contents have not - // changed - assertFalse(buffer.hasRemaining()); - buffer.position(0); - byte[] bufferContents = new byte[FILE_SIZE]; - buffer.get(bufferContents); - assertArrayEquals(bufferContents, existingBufferBytes); - } - } - - /** - * Reads half of the testFile into the {@link ByteBuffer} by setting a - * {@link ByteBuffer#limit} on the buffer. Validates that only half of the - * testFile is loaded into the buffer. - */ - private void testPreadWithLimitedByteBuffer( - ByteBuffer buffer) throws IOException { - int bytesRead; - int totalBytesRead = 0; - // Set the buffer limit to half the size of the file - buffer.limit(FILE_SIZE / 2); - - try (FSDataInputStream in = fs.open(testFile)) { - while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { - totalBytesRead += bytesRead; - // Check that each call to read changes the position of the ByteBuffer - // correctly - assertEquals(totalBytesRead, buffer.position()); - } - - // Since we set the buffer limit to half the size of the file, we should - // have only read half of the file into the buffer - assertEquals(totalBytesRead, FILE_SIZE / 2); - // Check that the buffer is full and the contents equal the first half of - // the file - assertFalse(buffer.hasRemaining()); - buffer.position(0); - byte[] bufferContents = new byte[FILE_SIZE / 2]; - buffer.get(bufferContents); - assertArrayEquals(bufferContents, - Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); - } - } - - /** - * Reads half of the testFile into the {@link ByteBuffer} by setting the - * {@link ByteBuffer#position} the half the size of the file. Validates that - * only half of the testFile is loaded into the buffer. - */ - private void testPreadWithPositionedByteBuffer( - ByteBuffer buffer) throws IOException { - int bytesRead; - int totalBytesRead = 0; - // Set the buffer position to half the size of the file - buffer.position(FILE_SIZE / 2); - - try (FSDataInputStream in = fs.open(testFile)) { - while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { - totalBytesRead += bytesRead; - // Check that each call to read changes the position of the ByteBuffer - // correctly - assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position()); - } - - // Since we set the buffer position to half the size of the file, we - // should have only read half of the file into the buffer - assertEquals(totalBytesRead, FILE_SIZE / 2); - // Check that the buffer is full and the contents equal the first half of - // the file - assertFalse(buffer.hasRemaining()); - buffer.position(FILE_SIZE / 2); - byte[] bufferContents = new byte[FILE_SIZE / 2]; - buffer.get(bufferContents); - assertArrayEquals(bufferContents, - Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); - } - } - - /** - * Reads half of the testFile into the {@link ByteBuffer} by specifying a - * position for the pread API that is half of the file size. Validates that - * only half of the testFile is loaded into the buffer. - */ - private void testPositionedPreadWithByteBuffer( - ByteBuffer buffer) throws IOException { - int bytesRead; - int totalBytesRead = 0; - - try (FSDataInputStream in = fs.open(testFile)) { - // Start reading from halfway through the file - while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2, - buffer)) > 0) { - totalBytesRead += bytesRead; - // Check that each call to read changes the position of the ByteBuffer - // correctly - assertEquals(totalBytesRead, buffer.position()); - } - - // Since we starting reading halfway through the file, the buffer should - // only be half full - assertEquals(totalBytesRead, FILE_SIZE / 2); - assertEquals(buffer.position(), FILE_SIZE / 2); - assertTrue(buffer.hasRemaining()); - // Check that the buffer contents equal the second half of the file - buffer.position(0); - byte[] bufferContents = new byte[FILE_SIZE / 2]; - buffer.get(bufferContents); - assertArrayEquals(bufferContents, - Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE)); - } - } - - @AfterClass - public static void shutdown() throws IOException { - try { - fs.delete(testFile, false); - fs.close(); - } finally { - cluster.shutdown(true); - } - } -}