From d166bd184d42709c19165c875fb728e19d833c2f Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 6 Apr 2012 06:31:10 +0000 Subject: [PATCH] HDFS-3110. Use directRead API to reduce the number of buffer copies in libhdfs. Contributed by Henry Robinson. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1310185 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop-hdfs/src/main/native/hdfs.c | 163 ++++++++++++--- .../hadoop-hdfs/src/main/native/hdfs.h | 5 +- .../hadoop-hdfs/src/main/native/hdfs_test.c | 87 +++++++- .../src/main/native/tests/test-libhdfs.sh | 190 +++++++----------- 5 files changed, 290 insertions(+), 158 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 108f4b5431..78585bf52f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -70,6 +70,9 @@ Trunk (unreleased changes) HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. (Henry Robinson via todd) + HDFS-3110. Use directRead API to reduce the number of buffer copies in + libhdfs (Henry Robinson via todd) + BUG FIXES HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c index 7371caf090..946b31252a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c @@ -123,6 +123,11 @@ static int errnoFromException(jthrowable exc, JNIEnv *env, goto done; } + if (!strcmp(excClass, "java.lang.UnsupportedOperationException")) { + errnum = ENOTSUP; + goto done; + } + if (!strcmp(excClass, "org.apache.hadoop.security." "AccessControlException")) { errnum = EACCES; @@ -614,8 +619,29 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, } else { file->file = (*env)->NewGlobalRef(env, jVal.l); file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT); + file->flags = 0; destroyLocalReference(env, jVal.l); + + if ((flags & O_WRONLY) == 0) { + // Try a test read to see if we can do direct reads + errno = 0; + char buf; + if (readDirect(fs, file, &buf, 0) == 0) { + // Success - 0-byte read should return 0 + file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; + } else { + if (errno != ENOTSUP) { + // Unexpected error. Clear it, don't set the direct flag. + fprintf(stderr, + "WARN: Unexpected error %d when testing " + "for direct read compatibility\n", errno); + errno = 0; + goto done; + } + } + errno = 0; + } } done: @@ -706,10 +732,57 @@ int hdfsExists(hdfsFS fs, const char *path) return jVal.z ? 0 : -1; } +// Checks input file for readiness for reading. +static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, + jobject* jInputStream) +{ + *jInputStream = (jobject)(f ? f->file : NULL); + //Sanity check + if (!f || f->type == UNINITIALIZED) { + errno = EBADF; + return -1; + } + + //Error checking... make sure that this file is 'readable' + if (f->type != INPUT) { + fprintf(stderr, "Cannot read from a non-InputStream object!\n"); + errno = EINVAL; + return -1; + } + + return 0; +} + +// Common error-handling code between read paths. +static int handleReadResult(int success, jvalue jVal, jthrowable jExc, + JNIEnv* env) +{ + int noReadBytes; + if (success != 0) { + if ((*env)->ExceptionCheck(env)) { + errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." + "FSDataInputStream::read"); + } + noReadBytes = -1; + } else { + noReadBytes = jVal.i; + if (noReadBytes < 0) { + // -1 from Java is EOF, which is 0 here + noReadBytes = 0; + } + errno = 0; + } + + return noReadBytes; +} tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { + if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { + return readDirect(fs, f, buffer, length); + } + // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(bR); @@ -722,46 +795,26 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) } //Parameters - jobject jInputStream = (jobject)(f ? f->file : NULL); + jobject jInputStream; + if (readPrepare(env, fs, f, &jInputStream) == -1) { + return -1; + } jbyteArray jbRarray; jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; - //Sanity check - if (!f || f->type == UNINITIALIZED) { - errno = EBADF; - return -1; - } - - //Error checking... make sure that this file is 'readable' - if (f->type != INPUT) { - fprintf(stderr, "Cannot read from a non-InputStream object!\n"); - errno = EINVAL; - return -1; - } - //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); - if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, - "read", "([B)I", jbRarray) != 0) { - errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." - "FSDataInputStream::read"); - noReadBytes = -1; - } - else { - noReadBytes = jVal.i; - if (noReadBytes > 0) { - (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); - } else { - //This is a valid case: there aren't any bytes left to read! - if (noReadBytes == 0 || noReadBytes < -1) { - fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes); - } - noReadBytes = 0; - } - errno = 0; + + int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, + "read", "([B)I", jbRarray); + + noReadBytes = handleReadResult(success, jVal, jExc, env);; + + if (noReadBytes > 0) { + (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } destroyLocalReference(env, jbRarray); @@ -769,6 +822,52 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) return noReadBytes; } +// Reads using the read(ByteBuffer) API, which does fewer copies +tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) +{ + // JAVA EQUIVALENT: + // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer + // fis.read(bbuffer); + + //Get the JNIEnv* corresponding to current thread + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + jobject jInputStream; + if (readPrepare(env, fs, f, &jInputStream) == -1) { + return -1; + } + + jint noReadBytes = 0; + jvalue jVal; + jthrowable jExc = NULL; + + //Read the requisite bytes + jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length); + if (bb == NULL) { + fprintf(stderr, "Could not allocate ByteBuffer"); + if ((*env)->ExceptionCheck(env)) { + errno = errnoFromException(NULL, env, "JNIEnv::NewDirectByteBuffer"); + } else { + errno = ENOMEM; // Best guess if there's no exception waiting + } + return -1; + } + + int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, + HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", + bb); + + noReadBytes = handleReadResult(success, jVal, jExc, env); + + destroyLocalReference(env, bb); + + return noReadBytes; +} + tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h index 0ee29d50ad..67bd288e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h @@ -81,12 +81,16 @@ extern "C" { }; + // Bit fields for hdfsFile_internal flags + #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) + /** * The 'file-handle' to a file in hdfs. */ struct hdfsFile_internal { void* file; enum hdfsStreamType type; + uint32_t flags; }; typedef struct hdfsFile_internal* hdfsFile; @@ -203,7 +207,6 @@ extern "C" { */ tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); - /** * hdfsPread - Positional read of data from an open file. * @param fs The configured filesystem handle. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c index 2e6545de5a..21a4f8190e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c @@ -18,6 +18,8 @@ #include "hdfs.h" +tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); + void permission_disp(short permissions, char *rtr) { rtr[9] = '\0'; int i; @@ -51,7 +53,6 @@ void permission_disp(short permissions, char *rtr) { } int main(int argc, char **argv) { - hdfsFS fs = hdfsConnectNewInstance("default", 0); if(!fs) { fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); @@ -64,20 +65,25 @@ int main(int argc, char **argv) { exit(-1); } - const char* writePath = "/tmp/testfile.txt"; + const char* writePath = "/tmp/testfile.txt"; + const char* fileContents = "Hello, World!"; + { //Write tests - hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); if(!writeFile) { fprintf(stderr, "Failed to open %s for writing!\n", writePath); exit(-1); } fprintf(stderr, "Opened %s for writing successfully...\n", writePath); - - char* buffer = "Hello, World!"; - tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); + tSize num_written_bytes = + hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents)+1); + if (num_written_bytes != strlen(fileContents) + 1) { + fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n", + (int)(strlen(fileContents) + 1), (int)num_written_bytes); + exit(-1); + } fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); tOffset currentPos = -1; @@ -138,18 +144,86 @@ int main(int argc, char **argv) { } fprintf(stderr, "Current position: %ld\n", currentPos); + if ((readFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) == 0) { + fprintf(stderr, "Direct read support incorrectly not detected " + "for HDFS filesystem\n"); + exit(-1); + } + + fprintf(stderr, "Direct read support detected for HDFS\n"); + + // Clear flags so that we really go through slow read path + readFile->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; + static char buffer[32]; tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, sizeof(buffer)); fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, sizeof(buffer)); fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); + if (hdfsSeek(fs, readFile, 0L)) { + fprintf(stderr, + "Failed to seek to file start for direct read test!\n"); + exit(-1); + } + + readFile->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; + + memset(buffer, 0, strlen(fileContents + 1)); + num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, + sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_read_bytes); + exit(-1); + } + fprintf(stderr, "Read (direct) following %d bytes:\n%s\n", + num_read_bytes, buffer); hdfsCloseFile(fs, readFile); + + // Test correct behaviour for unsupported filesystems + hdfsFile localFile = hdfsOpenFile(lfs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); + if(!localFile) { + fprintf(stderr, "Failed to open %s for writing!\n", writePath); + exit(-1); + } + + tSize num_written_bytes = hdfsWrite(lfs, localFile, + (void*)fileContents, + strlen(fileContents) + 1); + + hdfsCloseFile(lfs, localFile); + localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0); + + if (localFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { + fprintf(stderr, "Direct read support incorrectly detected for local " + "filesystem\n"); + exit(-1); + } + + memset(buffer, 0, strlen(fileContents + 1)); + int result = readDirect(lfs, localFile, (void*)buffer, sizeof(buffer)); + if (result != -1) { + fprintf(stderr, "Expected error from local direct read not seen!\n"); + exit(-1); + } + + if (errno != ENOTSUP) { + fprintf(stderr, "Error code not correctly set to ENOTSUP, was %d!\n", + errno); + exit(-1); + } + + fprintf(stderr, "Expected exception thrown for unsupported direct read\n"); + + hdfsCloseFile(lfs, localFile); } int totalResult = 0; @@ -446,4 +520,3 @@ int main(int argc, char **argv) { /** * vim: ts=4: sw=4: et: */ - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh index c2e4fa5693..51bb15f45d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh @@ -17,126 +17,64 @@ # # -# Note: This script depends on 8 environment variables to function correctly: -# a) CLASSPATH -# b) HADOOP_PREFIX -# c) HADOOP_CONF_DIR -# d) HADOOP_LOG_DIR -# e) LIBHDFS_BUILD_DIR -# f) LIBHDFS_INSTALL_DIR -# g) OS_NAME -# h) CLOVER_JAR -# i} HADOOP_VERSION -# j) HADOOP_HDFS_HOME -# All these are passed by build.xml. +# Note: This script depends on 5 environment variables to function correctly: +# a) HADOOP_HOME - must be set +# b) HDFS_TEST_CONF_DIR - optional; the directory to read and write +# core-site.xml to. Defaults to /tmp +# c) LIBHDFS_BUILD_DIR - optional; the location of the hdfs_test +# executable. Defaults to the parent directory. +# d) OS_NAME - used to choose how to locate libjvm.so +# e) CLOVER_JAR - optional; the location of the Clover code coverage tool's jar. # +if [ "x$HADOOP_HOME" == "x" ]; then + echo "HADOOP_HOME is unset!" + exit 1 +fi + +if [ "x$LIBHDFS_BUILD_DIR" == "x" ]; then + LIBHDFS_BUILD_DIR=`pwd`/../ +fi + +if [ "x$HDFS_TEST_CONF_DIR" == "x" ]; then + HDFS_TEST_CONF_DIR=/tmp +fi + +# LIBHDFS_INSTALL_DIR is the directory containing libhdfs.so +LIBHDFS_INSTALL_DIR=$HADOOP_HOME/lib/native/ HDFS_TEST=hdfs_test -HADOOP_LIB_DIR=$HADOOP_PREFIX/lib -HADOOP_BIN_DIR=$HADOOP_PREFIX/bin -COMMON_BUILD_DIR=$HADOOP_PREFIX/build/ivy/lib/hadoop-hdfs/common -COMMON_JAR=$COMMON_BUILD_DIR/hadoop-common-$HADOOP_VERSION.jar +HDFS_TEST_JAR=`find $HADOOP_HOME/share/hadoop/hdfs/ \ +-name "hadoop-hdfs-*-tests.jar" | head -n 1` -cat > $HADOOP_CONF_DIR/core-site.xml < - - - - hadoop.tmp.dir - file:///$LIBHDFS_TEST_DIR - - - fs.default.name - hdfs://localhost:23000/ - - -EOF - -cat > $HADOOP_CONF_DIR/hdfs-site.xml < - - - - dfs.replication - 1 - - - dfs.support.append - true - - - dfs.namenode.logging.level - DEBUG - - -EOF - -cat > $HADOOP_CONF_DIR/slaves < /tmp/libhdfs-test-cluster.out 2>&1 & + +MINI_CLUSTER_PID=$! +for i in {1..15}; do + echo "Waiting for DFS cluster, attempt $i of 15" + [ -f $HDFS_TEST_CONF_DIR/core-site.xml ] && break; + sleep 2 +done + +if [ ! -f $HDFS_TEST_CONF_DIR/core-site.xml ]; then + echo "Cluster did not come up in 30s" + kill -9 $MINI_CLUSTER_PID + exit 1 fi -echo exiting with $BUILD_STATUS +echo "Cluster up, running tests" +# Disable error checking to make sure we get to cluster cleanup +set +e + +CLASSPATH=$CLASSPATH \ +LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" \ +$LIBHDFS_BUILD_DIR/$HDFS_TEST + +BUILD_STATUS=$? + +echo "Tearing cluster down" +kill -9 $MINI_CLUSTER_PID +echo "Exiting with $BUILD_STATUS" exit $BUILD_STATUS