diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 5c879ecf41..67e8690456 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -740,6 +740,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: return true; default: return false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java new file mode 100644 index 0000000000..873a521d00 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Implementers of this interface provide a positioned read API that writes to a + * {@link ByteBuffer} rather than a {@code byte[]}. + * + * @see PositionedReadable + * @see ByteBufferReadable + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ByteBufferPositionedReadable { + /** + * Reads up to {@code buf.remaining()} bytes into buf from a given position + * in the file and returns the number of bytes read. Callers should use + * {@code buf.limit(...)} to control the size of the desired read and + * {@code buf.position(...)} to control the offset into the buffer the data + * should be written to. + *
+ * After a successful call, {@code buf.position()} will be advanced by the + * number of bytes read and {@code buf.limit()} should be unchanged. + *
+ * In the case of an exception, the values of {@code buf.position()} and + * {@code buf.limit()} are undefined, and callers should be prepared to + * recover from this eventuality. + *
+ * Many implementations will throw {@link UnsupportedOperationException}, so + * callers that are not confident in support for this method from the + * underlying filesystem should be prepared to handle that exception. + *
+ * Implementations should treat 0-length requests as legitimate, and must not
+ * signal an error upon their receipt.
+ *
+ * @param position position within file
+ * @param buf the ByteBuffer to receive the results of the read operation.
+ * @return the number of bytes read, possibly zero, or -1 if reached
+ * end-of-stream
+ * @throws IOException if there is some error performing the read
+ */
+ int read(long position, ByteBuffer buf) throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 3549cdc4fa..c52d30762f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -59,6 +59,12 @@ public interface StreamCapabilities {
*/
String UNBUFFER = "in:unbuffer";
+ /**
+ * Stream read(ByteBuffer) capability implemented by
+ * {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
+ */
+ String READBYTEBUFFER = "in:readbytebuffer";
+
/**
* Capabilities that a stream can support and be queried for.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index f47b88cb5a..a3e2ad5afc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1779,6 +1779,7 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
+ case StreamCapabilities.READBYTEBUFFER:
return true;
default:
return false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
index 0cced979ad..41caffd290 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
@@ -1013,7 +1013,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
return f{is|os};
*/
int accmode = flags & O_ACCMODE;
- jstring jStrBufferSize = NULL, jStrReplication = NULL;
+ jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL;
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
jobject jFS = (jobject)fs;
jthrowable jthr;
@@ -1171,16 +1171,22 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
file->flags = 0;
if ((flags & O_WRONLY) == 0) {
- // Try a test read to see if we can do direct reads
- char buf;
- if (readDirect(fs, file, &buf, 0) == 0) {
- // Success - 0-byte read should return 0
+ // Check the StreamCapabilities of jFile to see if we can do direct reads
+ jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFile(%s): newJavaStr", path);
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, INSTANCE, jFile, HADOOP_ISTRM,
+ "hasCapability", "(Ljava/lang/String;)Z", jCapabilityString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFile(%s): FSDataInputStream#hasCapability", path);
+ goto done;
+ }
+ if (jVal.z) {
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
- } else if (errno != ENOTSUP) {
- // Unexpected error. Clear it, don't set the direct flag.
- fprintf(stderr,
- "hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
- "for direct read compatibility\n", path, errno);
}
}
ret = 0;
@@ -1190,7 +1196,8 @@ done:
destroyLocalReference(env, jStrReplication);
destroyLocalReference(env, jConfiguration);
destroyLocalReference(env, jPath);
- destroyLocalReference(env, jFile);
+ destroyLocalReference(env, jFile);
+ destroyLocalReference(env, jCapabilityString);
if (ret) {
if (file) {
if (file->file) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc
index 19d95b47e6..79771f0d7c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc
@@ -503,7 +503,10 @@ TEST_F(HdfsExtTest, TestReadStats) {
hdfsFileFreeReadStatistics(stats);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
- EXPECT_EQ(0, errno);
+ // Since libhdfs is not guaranteed to set errno to 0 on successful
+ // operations, we disable this check for now, see HDFS-14325 for a
+ // long term solution to this problem
+ // EXPECT_EQ(0, errno);
}
//Testing working directory
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java
new file mode 100644
index 0000000000..1d2bca61b3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java
@@ -0,0 +1,27 @@
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+/**
+ * This exception collects all IOExceptions thrown when adding block pools and
+ * scanning volumes. It keeps the information about which volume is associated
+ * with an exception.
+ *
+ */
+public class AddBlockPoolException extends IOException {
+ private Map