From 02a5551ba3d3d44b79bf594816263d625cdca36f Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Fri, 20 Jul 2012 19:15:52 +0000 Subject: [PATCH] HDFS-3608. fuse_dfs: detect changes in UID ticket cache. Contributed by Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1363904 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop-hdfs/src/CMakeLists.txt | 1 + .../src/main/native/fuse-dfs/CMakeLists.txt | 1 + .../src/main/native/fuse-dfs/fuse_connect.c | 638 ++++++++++++--- .../src/main/native/fuse-dfs/fuse_connect.h | 70 +- .../native/fuse-dfs/fuse_context_handle.h | 2 - .../src/main/native/fuse-dfs/fuse_dfs.c | 37 +- .../main/native/fuse-dfs/fuse_file_handle.h | 4 +- .../main/native/fuse-dfs/fuse_impls_chmod.c | 16 +- .../main/native/fuse-dfs/fuse_impls_chown.c | 28 +- .../main/native/fuse-dfs/fuse_impls_flush.c | 5 +- .../main/native/fuse-dfs/fuse_impls_getattr.c | 28 +- .../main/native/fuse-dfs/fuse_impls_mkdir.c | 28 +- .../main/native/fuse-dfs/fuse_impls_open.c | 65 +- .../main/native/fuse-dfs/fuse_impls_read.c | 9 +- .../main/native/fuse-dfs/fuse_impls_readdir.c | 26 +- .../main/native/fuse-dfs/fuse_impls_release.c | 6 +- .../main/native/fuse-dfs/fuse_impls_rename.c | 29 +- .../main/native/fuse-dfs/fuse_impls_rmdir.c | 46 +- .../main/native/fuse-dfs/fuse_impls_statfs.c | 35 +- .../native/fuse-dfs/fuse_impls_truncate.c | 22 +- .../main/native/fuse-dfs/fuse_impls_unlink.c | 31 +- .../main/native/fuse-dfs/fuse_impls_utimens.c | 25 +- .../main/native/fuse-dfs/fuse_impls_write.c | 7 +- .../src/main/native/fuse-dfs/fuse_init.c | 26 +- .../src/main/native/libhdfs/hdfs.c | 72 +- .../src/main/native/libhdfs/hdfs.h | 24 +- .../src/main/native/libhdfs/jni_helper.c | 6 + .../hadoop-hdfs/src/main/native/util/tree.h | 765 ++++++++++++++++++ .../src/main/resources/hdfs-default.xml | 21 + 30 files changed, 1740 insertions(+), 336 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/tree.h diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cc16ac3563..7a2c6acf99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -520,6 +520,9 @@ Branch-2 ( Unreleased changes ) HDFS-3597. SNN fails to start after DFS upgrade. (Andy Isaacson via todd) + HDFS-3608. fuse_dfs: detect changes in UID ticket cache. (Colin Patrick + McCabe via atm) + BREAKDOWN OF HDFS-3042 SUBTASKS HDFS-2185. HDFS portion of ZK-based FailoverController (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt index 4a96bbd7a0..7c1441fca5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt @@ -87,6 +87,7 @@ include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${JNI_INCLUDE_DIRS} + main/native main/native/libhdfs ) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt index fb3c580e94..f04870957e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt @@ -69,5 +69,6 @@ IF(FUSE_FOUND) ${JAVA_JVM_LIBRARY} hdfs m + pthread ) ENDIF(FUSE_FOUND) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c index bfb7a1eedb..c6624fad99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c @@ -16,17 +16,38 @@ * limitations under the License. */ -#include "hdfs.h" -#include "fuse_dfs.h" #include "fuse_connect.h" +#include "fuse_dfs.h" #include "fuse_users.h" +#include "libhdfs/hdfs.h" +#include "util/tree.h" +#include #include +#include #include #include #include +#include +#include +#include -#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication" +#define FUSE_CONN_DEFAULT_TIMER_PERIOD 5 +#define FUSE_CONN_DEFAULT_EXPIRY_PERIOD (5 * 60) +#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication" +#define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout" +#define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period" + +/** Length of the buffer needed by asctime_r */ +#define TIME_STR_LEN 26 + +struct hdfsConn; + +static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b); +static void hdfsConnExpiry(void); +static void* hdfsConnExpiryThread(void *v); + +RB_HEAD(hdfsConnTree, hdfsConn); enum authConf { AUTH_CONF_UNKNOWN, @@ -34,58 +55,54 @@ enum authConf { AUTH_CONF_OTHER, }; -#define MAX_ELEMENTS (16 * 1024) -static struct hsearch_data *fsTable = NULL; -static enum authConf hdfsAuthConf = AUTH_CONF_UNKNOWN; -static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER; +struct hdfsConn { + RB_ENTRY(hdfsConn) entry; + /** How many threads are currently using this hdfsConnection object */ + int64_t refcnt; + /** The username used to make this connection. Dynamically allocated. */ + char *usrname; + /** Kerberos ticket cache path, or NULL if this is not a kerberized + * connection. Dynamically allocated. */ + char *kpath; + /** mtime of the kpath, if the kpath is non-NULL */ + time_t kPathMtime; + /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */ + long kPathMtimeNs; + /** The cached libhdfs fs instance */ + hdfsFS fs; + /** Nonzero if this hdfs connection needs to be closed as soon as possible. + * If this is true, the connection has been removed from the tree. */ + int condemned; + /** Number of times we should run the expiration timer on this connection + * before removing it. */ + int expirationCount; +}; -/* - * Allocate a hash table for fs handles. Returns 0 on success, - * -1 on failure. - */ -int allocFsTable(void) { - assert(NULL == fsTable); - fsTable = calloc(1, sizeof(struct hsearch_data)); - if (0 == hcreate_r(MAX_ELEMENTS, fsTable)) { - ERROR("Unable to initialize connection table"); - return -1; - } - return 0; -} +RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare); -/* - * Find a fs handle for the given key. Returns a fs handle, - * or NULL if there is no fs for the given key. - */ -static hdfsFS findFs(char *key) { - ENTRY entry; - ENTRY *entryP = NULL; - entry.key = key; - if (0 == hsearch_r(entry, FIND, &entryP, fsTable)) { - return NULL; - } - assert(NULL != entryP->data); - return (hdfsFS)entryP->data; -} +/** Current cached libhdfs connections */ +static struct hdfsConnTree gConnTree; -/* - * Insert the given fs handle into the table. - * Returns 0 on success, -1 on failure. - */ -static int insertFs(char *key, hdfsFS fs) { - ENTRY entry; - ENTRY *entryP = NULL; - assert(NULL != fs); - entry.key = strdup(key); - if (entry.key == NULL) { - return -1; - } - entry.data = (void*)fs; - if (0 == hsearch_r(entry, ENTER, &entryP, fsTable)) { - return -1; - } - return 0; -} +/** The URI used to make our connections. Dynamically allocated. */ +static char *gUri; + +/** The port used to make our connections, or 0. */ +static int gPort; + +/** Lock which protects gConnTree and gConnectTimer->active */ +static pthread_mutex_t gConnMutex; + +/** Type of authentication configured */ +static enum authConf gHdfsAuthConf; + +/** FUSE connection timer expiration period */ +static int32_t gTimerPeriod; + +/** FUSE connection expiry period */ +static int32_t gExpiryPeriod; + +/** FUSE timer expiration thread */ +static pthread_t gTimerThread; /** * Find out what type of authentication the system administrator @@ -95,19 +112,251 @@ static int insertFs(char *key, hdfsFS fs) { */ static enum authConf discoverAuthConf(void) { - int ret; - char *val = NULL; - enum authConf authConf; + int ret; + char *val = NULL; + enum authConf authConf; - ret = hdfsConfGet(HADOOP_SECURITY_AUTHENTICATION, &val); - if (ret) - authConf = AUTH_CONF_UNKNOWN; - else if (!strcmp(val, "kerberos")) - authConf = AUTH_CONF_KERBEROS; - else - authConf = AUTH_CONF_OTHER; - free(val); - return authConf; + ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val); + if (ret) + authConf = AUTH_CONF_UNKNOWN; + else if (!val) + authConf = AUTH_CONF_OTHER; + else if (!strcmp(val, "kerberos")) + authConf = AUTH_CONF_KERBEROS; + else + authConf = AUTH_CONF_OTHER; + free(val); + return authConf; +} + +int fuseConnectInit(const char *nnUri, int port) +{ + const char *timerPeriod; + int ret; + + gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD; + ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gTimerPeriod); + if (ret) { + fprintf(stderr, "Unable to determine the configured value for %s.", + HADOOP_FUSE_TIMER_PERIOD); + return -EINVAL; + } + if (gTimerPeriod < 1) { + fprintf(stderr, "Invalid value %d given for %s.\n", + gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD); + return -EINVAL; + } + gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD; + ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod); + if (ret) { + fprintf(stderr, "Unable to determine the configured value for %s.", + HADOOP_FUSE_CONNECTION_TIMEOUT); + return -EINVAL; + } + if (gExpiryPeriod < 1) { + fprintf(stderr, "Invalid value %d given for %s.\n", + gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT); + return -EINVAL; + } + gHdfsAuthConf = discoverAuthConf(); + if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) { + fprintf(stderr, "Unable to determine the configured value for %s.", + HADOOP_SECURITY_AUTHENTICATION); + return -EINVAL; + } + gPort = port; + gUri = strdup(nnUri); + if (!gUri) { + fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n"); + return -ENOMEM; + } + ret = pthread_mutex_init(&gConnMutex, NULL); + if (ret) { + free(gUri); + fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n", + ret); + return -ret; + } + RB_INIT(&gConnTree); + ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL); + if (ret) { + free(gUri); + pthread_mutex_destroy(&gConnMutex); + fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n", + ret); + return -ret; + } + fprintf(stderr, "fuseConnectInit: initialized with timer period %d, " + "expiry period %d\n", gTimerPeriod, gExpiryPeriod); + return 0; +} + +/** + * Compare two libhdfs connections by username + * + * @param a The first libhdfs connection + * @param b The second libhdfs connection + * + * @return -1, 0, or 1 depending on a < b, a ==b, a > b + */ +static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b) +{ + return strcmp(a->usrname, b->usrname); +} + +/** + * Find a libhdfs connection by username + * + * @param usrname The username to look up + * + * @return The connection, or NULL if none could be found + */ +static struct hdfsConn* hdfsConnFind(const char *usrname) +{ + struct hdfsConn exemplar; + + memset(&exemplar, 0, sizeof(exemplar)); + exemplar.usrname = (char*)usrname; + return RB_FIND(hdfsConnTree, &gConnTree, &exemplar); +} + +/** + * Free the resource associated with a libhdfs connection. + * + * You must remove the connection from the tree before calling this function. + * + * @param conn The libhdfs connection + */ +static void hdfsConnFree(struct hdfsConn *conn) +{ + int ret; + + ret = hdfsDisconnect(conn->fs); + if (ret) { + fprintf(stderr, "hdfsConnFree(username=%s): " + "hdfsDisconnect failed with error %d\n", + (conn->usrname ? conn->usrname : "(null)"), ret); + } + free(conn->usrname); + free(conn->kpath); + free(conn); +} + +/** + * Convert a time_t to a string. + * + * @param sec time in seconds since the epoch + * @param buf (out param) output buffer + * @param bufLen length of output buffer + * + * @return 0 on success; ENAMETOOLONG if the provided buffer was + * too short + */ +static int timeToStr(time_t sec, char *buf, size_t bufLen) +{ + struct tm tm, *out; + size_t l; + + if (bufLen < TIME_STR_LEN) { + return -ENAMETOOLONG; + } + out = localtime_r(&sec, &tm); + asctime_r(out, buf); + // strip trailing newline + l = strlen(buf); + if (l != 0) + buf[l - 1] = '\0'; + return 0; +} + +/** + * Check an HDFS connection's Kerberos path. + * + * If the mtime of the Kerberos ticket cache file has changed since we first + * opened the connection, mark the connection as condemned and remove it from + * the hdfs connection tree. + * + * @param conn The HDFS connection + */ +static int hdfsConnCheckKpath(const struct hdfsConn *conn) +{ + int ret; + struct stat st; + char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN]; + + if (stat(conn->kpath, &st) < 0) { + ret = errno; + if (ret == ENOENT) { + fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos " + "ticket cache file '%s' has disappeared. Condemning the " + "connection.\n", conn->usrname, conn->kpath); + } else { + fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) " + "failed with error code %d. Pessimistically condemning the " + "connection.\n", conn->usrname, conn->kpath, ret); + } + return -ret; + } + if ((st.st_mtim.tv_sec != conn->kPathMtime) || + (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) { + timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf)); + timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf)); + fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' " + "has changed from '%s' to '%s'. Condemning the connection " + "because our cached Kerberos credentials have probably " + "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf); + return -EINTERNAL; + } + return 0; +} + +/** + * Cache expiration logic. + * + * This function is called periodically by the cache expiration thread. For + * each FUSE connection not currently in use (refcnt == 0) it will decrement the + * expirationCount for that connection. Once the expirationCount reaches 0 for + * a connection, it can be garbage collected. + * + * We also check to see if the Kerberos credentials have changed. If so, the + * connecton is immediately condemned, even if it is currently in use. + */ +static void hdfsConnExpiry(void) +{ + struct hdfsConn *conn, *tmpConn; + + pthread_mutex_lock(&gConnMutex); + RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) { + if (conn->kpath) { + if (hdfsConnCheckKpath(conn)) { + conn->condemned = 1; + RB_REMOVE(hdfsConnTree, &gConnTree, conn); + if (conn->refcnt == 0) { + /* If the connection is not in use by any threads, delete it + * immediately. If it is still in use by some threads, the last + * thread using it will clean it up later inside hdfsConnRelease. */ + hdfsConnFree(conn); + continue; + } + } + } + if (conn->refcnt == 0) { + /* If the connection is not currently in use by a thread, check to see if + * it ought to be removed because it's too old. */ + conn->expirationCount--; + if (conn->expirationCount <= 0) { + if (conn->condemned) { + fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection " + "as %s is still in the tree!\n", conn->usrname); + } + fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as " + "%s because it's now too old.\n", conn->usrname); + RB_REMOVE(hdfsConnTree, &gConnTree, conn); + hdfsConnFree(conn); + } + } + } + pthread_mutex_unlock(&gConnMutex); } /** @@ -129,9 +378,9 @@ static enum authConf discoverAuthConf(void) * @param path (out param) the path to the ticket cache file * @param pathLen length of the path buffer */ -static void findKerbTicketCachePath(char *path, size_t pathLen) +static void findKerbTicketCachePath(struct fuse_context *ctx, + char *path, size_t pathLen) { - struct fuse_context *ctx = fuse_get_context(); FILE *fp = NULL; static const char * const KRB5CCNAME = "\0KRB5CCNAME="; int c = '\0', pathIdx = 0, keyIdx = 0; @@ -168,72 +417,213 @@ done: } } -/* - * Connect to the NN as the current user/group. - * Returns a fs handle on success, or NULL on failure. +/** + * Create a new libhdfs connection. + * + * @param usrname Username to use for the new connection + * @param ctx FUSE context to use for the new connection + * @param out (out param) the new libhdfs connection + * + * @return 0 on success; error code otherwise */ -hdfsFS doConnectAsUser(const char *nn_uri, int nn_port) { - struct hdfsBuilder *bld; - uid_t uid = fuse_get_context()->uid; - char *user = getUsername(uid); - char kpath[PATH_MAX]; +static int fuseNewConnect(const char *usrname, struct fuse_context *ctx, + struct hdfsConn **out) +{ + struct hdfsBuilder *bld = NULL; + char kpath[PATH_MAX] = { 0 }; + struct hdfsConn *conn = NULL; int ret; - hdfsFS fs = NULL; - if (NULL == user) { - goto done; + struct stat st; + + conn = calloc(1, sizeof(struct hdfsConn)); + if (!conn) { + fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n"); + ret = -ENOMEM; + goto error; } - - ret = pthread_mutex_lock(&tableMutex); - assert(0 == ret); - - fs = findFs(user); - if (NULL == fs) { - if (hdfsAuthConf == AUTH_CONF_UNKNOWN) { - hdfsAuthConf = discoverAuthConf(); - if (hdfsAuthConf == AUTH_CONF_UNKNOWN) { - ERROR("Unable to determine the configured value for %s.", - HADOOP_SECURITY_AUTHENTICATION); - goto done; - } + bld = hdfsNewBuilder(); + if (!bld) { + fprintf(stderr, "Unable to create hdfs builder\n"); + ret = -ENOMEM; + goto error; + } + /* We always want to get a new FileSystem instance here-- that's why we call + * hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic + * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the + * existing cached connection which we wanted to get rid of. + */ + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderSetNameNode(bld, gUri); + if (gPort) { + hdfsBuilderSetNameNodePort(bld, gPort); + } + hdfsBuilderSetUserName(bld, usrname); + if (gHdfsAuthConf == AUTH_CONF_KERBEROS) { + findKerbTicketCachePath(ctx, kpath, sizeof(kpath)); + if (stat(kpath, &st) < 0) { + fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache " + "file '%s'. Did you remember to kinit for UID %d?\n", + kpath, ctx->uid); + ret = -EACCES; + goto error; } - bld = hdfsNewBuilder(); - if (!bld) { - ERROR("Unable to create hdfs builder"); - goto done; - } - hdfsBuilderSetForceNewInstance(bld); - hdfsBuilderSetNameNode(bld, nn_uri); - if (nn_port) { - hdfsBuilderSetNameNodePort(bld, nn_port); - } - hdfsBuilderSetUserName(bld, user); - if (hdfsAuthConf == AUTH_CONF_KERBEROS) { - findKerbTicketCachePath(kpath, sizeof(kpath)); - hdfsBuilderSetKerbTicketCachePath(bld, kpath); - } - fs = hdfsBuilderConnect(bld); - if (NULL == fs) { - int err = errno; - ERROR("Unable to create fs for user %s: error code %d", user, err); - goto done; - } - if (-1 == insertFs(user, fs)) { - ERROR("Unable to cache fs for user %s", user); + conn->kPathMtime = st.st_mtim.tv_sec; + conn->kPathMtimeNs = st.st_mtim.tv_nsec; + hdfsBuilderSetKerbTicketCachePath(bld, kpath); + conn->kpath = strdup(kpath); + if (!conn->kpath) { + fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n"); + ret = -ENOMEM; + goto error; } } + conn->usrname = strdup(usrname); + if (!conn->usrname) { + fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n"); + ret = -ENOMEM; + goto error; + } + conn->fs = hdfsBuilderConnect(bld); + bld = NULL; + if (!conn->fs) { + ret = errno; + fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: " + "error code %d\n", usrname, ret); + goto error; + } + RB_INSERT(hdfsConnTree, &gConnTree, conn); + *out = conn; + return 0; -done: - ret = pthread_mutex_unlock(&tableMutex); - assert(0 == ret); - free(user); - return fs; +error: + if (bld) { + hdfsFreeBuilder(bld); + } + if (conn) { + free(conn->kpath); + free(conn->usrname); + free(conn); + } + return ret; } -/* - * We currently cache a fs handle per-user in this module rather - * than use the FileSystem cache in the java client. Therefore - * we do not disconnect the fs handle here. - */ -int doDisconnect(hdfsFS fs) { +int fuseConnect(const char *usrname, struct fuse_context *ctx, + struct hdfsConn **out) +{ + int ret; + struct hdfsConn* conn; + + pthread_mutex_lock(&gConnMutex); + conn = hdfsConnFind(usrname); + if (!conn) { + ret = fuseNewConnect(usrname, ctx, &conn); + if (ret) { + pthread_mutex_unlock(&gConnMutex); + fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with " + "error code %d\n", usrname, ret); + return ret; + } + } + conn->refcnt++; + conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod; + if (conn->expirationCount < 2) + conn->expirationCount = 2; + pthread_mutex_unlock(&gConnMutex); + *out = conn; return 0; } + +int fuseConnectAsThreadUid(struct hdfsConn **conn) +{ + struct fuse_context *ctx; + char *usrname; + int ret; + + ctx = fuse_get_context(); + usrname = getUsername(ctx->uid); + ret = fuseConnect(usrname, ctx, conn); + free(usrname); + return ret; +} + +int fuseConnectTest(void) +{ + int ret; + struct hdfsConn *conn; + + if (gHdfsAuthConf == AUTH_CONF_KERBEROS) { + // TODO: call some method which can tell us whether the FS exists. In order + // to implement this, we have to add a method to FileSystem in order to do + // this without valid Kerberos authentication. See HDFS-3674 for details. + return 0; + } + ret = fuseNewConnect("root", NULL, &conn); + if (ret) { + fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret); + return ret; + } + hdfsConnRelease(conn); + return 0; +} + +struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn) +{ + return conn->fs; +} + +void hdfsConnRelease(struct hdfsConn *conn) +{ + pthread_mutex_lock(&gConnMutex); + conn->refcnt--; + if ((conn->refcnt == 0) && (conn->condemned)) { + fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n", + conn->usrname); + /* Notice that we're not removing the connection from gConnTree here. + * If the connection is condemned, it must have already been removed from + * the tree, so that no other threads start using it. + */ + hdfsConnFree(conn); + } + pthread_mutex_unlock(&gConnMutex); +} + +/** + * Get the monotonic time. + * + * Unlike the wall-clock time, monotonic time only ever goes forward. If the + * user adjusts the time, the monotonic time will not be affected. + * + * @return The monotonic time + */ +static time_t getMonotonicTime(void) +{ + int res; + struct timespec ts; + + res = clock_gettime(CLOCK_MONOTONIC, &ts); + if (res) + abort(); + return ts.tv_sec; +} + +/** + * FUSE connection expiration thread + * + */ +static void* hdfsConnExpiryThread(void *v) +{ + time_t nextTime, curTime; + int waitTime; + + nextTime = getMonotonicTime() + gTimerPeriod; + while (1) { + curTime = getMonotonicTime(); + if (curTime >= nextTime) { + hdfsConnExpiry(); + nextTime = curTime + gTimerPeriod; + } + waitTime = (nextTime - curTime) * 1000; + poll(NULL, 0, waitTime); + } + return NULL; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h index 4bddeeae9f..35645c66b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h @@ -19,10 +19,72 @@ #ifndef __FUSE_CONNECT_H__ #define __FUSE_CONNECT_H__ -#include "fuse_dfs.h" +struct fuse_context; +struct hdfsConn; +struct hdfs_internal; -hdfsFS doConnectAsUser(const char *nn_uri, int nn_port); -int doDisconnect(hdfsFS fs); -int allocFsTable(void); +/** + * Initialize the fuse connection subsystem. + * + * This must be called before any of the other functions in this module. + * + * @param nnUri The NameNode URI + * @param port The NameNode port + * + * @return 0 on success; error code otherwise + */ +int fuseConnectInit(const char *nnUri, int port); + +/** + * Get a libhdfs connection. + * + * If there is an existing connection, it will be reused. If not, a new one + * will be created. + * + * You must call hdfsConnRelease on the connection you get back! + * + * @param usrname The username to use + * @param ctx The FUSE context to use (contains UID, PID of requestor) + * @param conn (out param) The HDFS connection + * + * @return 0 on success; error code otherwise + */ +int fuseConnect(const char *usrname, struct fuse_context *ctx, + struct hdfsConn **out); + +/** + * Get a libhdfs connection. + * + * The same as fuseConnect, except the username will be determined from the FUSE + * thread context. + * + * @param conn (out param) The HDFS connection + * + * @return 0 on success; error code otherwise + */ +int fuseConnectAsThreadUid(struct hdfsConn **conn); + +/** + * Test whether we can connect to the HDFS cluster + * + * @return 0 on success; error code otherwise + */ +int fuseConnectTest(void); + +/** + * Get the hdfsFS associated with an hdfsConn. + * + * @param conn The hdfsConn + * + * @return the hdfsFS + */ +struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn); + +/** + * Release an hdfsConn when we're done with it. + * + * @param conn The hdfsConn + */ +void hdfsConnRelease(struct hdfsConn *conn); #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h index ae07735d88..f2b48be29c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h @@ -31,8 +31,6 @@ // typedef struct dfs_context_struct { int debug; - char *nn_uri; - int nn_port; int read_only; int usetrash; int direct_io; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c index e218c81c9e..1a8ede6348 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c @@ -65,8 +65,19 @@ static struct fuse_operations dfs_oper = { .truncate = dfs_truncate, }; +static void print_env_vars(void) +{ + const char *cp = getenv("CLASSPATH"); + const char *ld = getenv("LD_LIBRARY_PATH"); + + fprintf(stderr, "LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld); + fprintf(stderr, "CLASSPATH=%s",cp == NULL ? "NULL" : cp); +} + int main(int argc, char *argv[]) { + int ret; + umask(0); extern const char *program; @@ -106,24 +117,22 @@ int main(int argc, char *argv[]) exit(0); } - // Check connection as root + ret = fuseConnectInit(options.nn_uri, options.nn_port); + if (ret) { + ERROR("FATAL: dfs_init: fuseConnInit failed with error %d!", ret); + print_env_vars(); + exit(EXIT_FAILURE); + } if (options.initchecks == 1) { - hdfsFS tempFS = hdfsConnectAsUser(options.nn_uri, options.nn_port, "root"); - if (NULL == tempFS) { - const char *cp = getenv("CLASSPATH"); - const char *ld = getenv("LD_LIBRARY_PATH"); - ERROR("FATAL: misconfiguration - cannot connect to HDFS"); - ERROR("LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld); - ERROR("CLASSPATH=%s",cp == NULL ? "NULL" : cp); - exit(1); - } - if (doDisconnect(tempFS)) { - ERROR("FATAL: unable to disconnect from test filesystem."); - exit(1); + ret = fuseConnectTest(); + if (ret) { + ERROR("FATAL: dfs_init: fuseConnTest failed with error %d!", ret); + print_env_vars(); + exit(EXIT_FAILURE); } } - int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL); + ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL); fuse_opt_free_args(&args); return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h index 70cd8983a8..7f9346c1e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h @@ -22,6 +22,8 @@ #include #include +struct hdfsConn; + /** * * dfs_fh_struct is passed around for open files. Fuse provides a hook (the context) @@ -34,10 +36,10 @@ */ typedef struct dfs_fh_struct { hdfsFile hdfsFH; + struct hdfsConn *conn; char *buf; tSize bufferSize; //what is the size of the buffer we have off_t buffersStartOffset; //where the buffer starts in the file - hdfsFS fs; // for reads/writes need to access as the real user pthread_mutex_t mutex; } dfs_fh; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c index 2c1e96b2c1..8c25f53b6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c @@ -23,6 +23,8 @@ int dfs_chmod(const char *path, mode_t mode) { + struct hdfsConn *conn = NULL; + hdfsFS fs; TRACE1("chmod", path) int ret = 0; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; @@ -31,22 +33,24 @@ int dfs_chmod(const char *path, mode_t mode) assert(dfs); assert('/' == *path); - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect to HDFS"); + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); ret = -EIO; goto cleanup; } + fs = hdfsConnGetFs(conn); - if (hdfsChmod(userFS, path, (short)mode)) { + if (hdfsChmod(fs, path, (short)mode)) { ERROR("Could not chmod %s to %d", path, (int)mode); ret = (errno > 0) ? -errno : -EIO; goto cleanup; } cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c index a14ca7137e..2a6b61c027 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c @@ -25,12 +25,12 @@ int dfs_chown(const char *path, uid_t uid, gid_t gid) { - TRACE1("chown", path) - + struct hdfsConn *conn = NULL; int ret = 0; char *user = NULL; char *group = NULL; - hdfsFS userFS = NULL; + + TRACE1("chown", path) // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; @@ -61,14 +61,15 @@ int dfs_chown(const char *path, uid_t uid, gid_t gid) } } - userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect to HDFS"); + ret = fuseConnect(user, fuse_get_context(), &conn); + if (ret) { + fprintf(stderr, "fuseConnect: failed to open a libhdfs connection! " + "error %d.\n", ret); ret = -EIO; goto cleanup; } - if (hdfsChown(userFS, path, user, group)) { + if (hdfsChown(hdfsConnGetFs(conn), path, user, group)) { ret = errno; ERROR("Could not chown %s to %d:%d: error %d", path, (int)uid, gid, ret); ret = (ret > 0) ? -ret : -EIO; @@ -76,16 +77,11 @@ int dfs_chown(const char *path, uid_t uid, gid_t gid) } cleanup: - if (userFS && doDisconnect(userFS)) { - ret = -EIO; - } - if (user) { - free(user); - } - if (group) { - free(group); + if (conn) { + hdfsConnRelease(conn); } + free(user); + free(group); return ret; - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c index 6d4f05c4ec..adb065b229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c @@ -16,6 +16,7 @@ * limitations under the License. */ +#include "fuse_connect.h" #include "fuse_dfs.h" #include "fuse_impls.h" #include "fuse_file_handle.h" @@ -43,9 +44,7 @@ int dfs_flush(const char *path, struct fuse_file_info *fi) { assert(fh); hdfsFile file_handle = (hdfsFile)fh->hdfsFH; assert(file_handle); - - assert(fh->fs); - if (hdfsFlush(fh->fs, file_handle) != 0) { + if (hdfsFlush(hdfsConnGetFs(fh->conn), file_handle) != 0) { ERROR("Could not flush %lx for %s\n",(long)file_handle, path); return -EIO; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c index 56f634e71b..2e435185ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c @@ -23,22 +23,27 @@ int dfs_getattr(const char *path, struct stat *st) { + struct hdfsConn *conn = NULL; + hdfsFS fs; + int ret; + hdfsFileInfo *info; + TRACE1("getattr", path) - dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; - assert(dfs); assert(path); assert(st); - hdfsFS fs = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (NULL == fs) { - ERROR("Could not connect to %s:%d", dfs->nn_uri, dfs->nn_port); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } - - int ret = 0; - hdfsFileInfo *info = hdfsGetPathInfo(fs,path); + fs = hdfsConnGetFs(conn); + + info = hdfsGetPathInfo(fs,path); if (NULL == info) { ret = -ENOENT; goto cleanup; @@ -63,9 +68,8 @@ int dfs_getattr(const char *path, struct stat *st) hdfsFreeFileInfo(info,1); cleanup: - if (doDisconnect(fs)) { - ERROR("Could not disconnect from filesystem"); - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c index d0624afa1c..3aef108109 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c @@ -23,9 +23,12 @@ int dfs_mkdir(const char *path, mode_t mode) { - TRACE1("mkdir", path) - + struct hdfsConn *conn = NULL; + hdfsFS fs; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + int ret; + + TRACE1("mkdir", path) assert(path); assert(dfs); @@ -41,29 +44,32 @@ int dfs_mkdir(const char *path, mode_t mode) return -EACCES; } - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } + fs = hdfsConnGetFs(conn); // In theory the create and chmod should be atomic. - int ret = 0; - if (hdfsCreateDirectory(userFS, path)) { + if (hdfsCreateDirectory(fs, path)) { ERROR("HDFS could not create directory %s", path); ret = (errno > 0) ? -errno : -EIO; goto cleanup; } - if (hdfsChmod(userFS, path, (short)mode)) { + if (hdfsChmod(fs, path, (short)mode)) { ERROR("Could not chmod %s to %d", path, (int)mode); ret = (errno > 0) ? -errno : -EIO; } + ret = 0; cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c index 071590aa37..ecd772f63f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c @@ -21,38 +21,45 @@ #include "fuse_connect.h" #include "fuse_file_handle.h" +#include +#include + int dfs_open(const char *path, struct fuse_file_info *fi) { - TRACE1("open", path) - + hdfsFS fs = NULL; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + dfs_fh *fh = NULL; + int mutexInit = 0, ret; + + TRACE1("open", path) // check params and the context var assert(path); assert('/' == *path); assert(dfs); - int ret = 0; - // 0x8000 is always passed in and hadoop doesn't like it, so killing it here // bugbug figure out what this flag is and report problem to Hadoop JIRA int flags = (fi->flags & 0x7FFF); // retrieve dfs specific data - dfs_fh *fh = (dfs_fh*)calloc(1, sizeof (dfs_fh)); - if (fh == NULL) { + fh = (dfs_fh*)calloc(1, sizeof (dfs_fh)); + if (!fh) { ERROR("Malloc of new file handle failed"); - return -EIO; + ret = -EIO; + goto error; } - - fh->fs = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (fh->fs == NULL) { - ERROR("Could not connect to dfs"); - return -EIO; + ret = fuseConnectAsThreadUid(&fh->conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto error; } + fs = hdfsConnGetFs(fh->conn); if (flags & O_RDWR) { - hdfsFileInfo *info = hdfsGetPathInfo(fh->fs,path); + hdfsFileInfo *info = hdfsGetPathInfo(fs, path); if (info == NULL) { // File does not exist (maybe?); interpret it as a O_WRONLY // If the actual error was something else, we'll get it again when @@ -66,15 +73,23 @@ int dfs_open(const char *path, struct fuse_file_info *fi) } } - if ((fh->hdfsFH = hdfsOpenFile(fh->fs, path, flags, 0, 0, 0)) == NULL) { + if ((fh->hdfsFH = hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) { ERROR("Could not open file %s (errno=%d)", path, errno); if (errno == 0 || errno == EINTERNAL) { - return -EIO; + ret = -EIO; + goto error; } - return -errno; + ret = -errno; + goto error; } - pthread_mutex_init(&fh->mutex, NULL); + ret = pthread_mutex_init(&fh->mutex, NULL); + if (ret) { + fprintf(stderr, "dfs_open: error initializing mutex: error %d\n", ret); + ret = -EIO; + goto error; + } + mutexInit = 1; if (fi->flags & O_WRONLY || fi->flags & O_CREAT) { fh->buf = NULL; @@ -84,11 +99,27 @@ int dfs_open(const char *path, struct fuse_file_info *fi) if (NULL == fh->buf) { ERROR("Could not allocate memory for a read for file %s\n", path); ret = -EIO; + goto error; } fh->buffersStartOffset = 0; fh->bufferSize = 0; } fi->fh = (uint64_t)fh; + return 0; +error: + if (fh) { + if (mutexInit) { + pthread_mutex_destroy(&fh->mutex); + } + free(fh->buf); + if (fh->hdfsFH) { + hdfsCloseFile(fs, fh->hdfsFH); + } + if (fh->conn) { + hdfsConnRelease(fh->conn); + } + free(fh); + } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c index 52092616c0..feade45403 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c @@ -16,9 +16,10 @@ * limitations under the License. */ +#include "fuse_connect.h" #include "fuse_dfs.h" -#include "fuse_impls.h" #include "fuse_file_handle.h" +#include "fuse_impls.h" static size_t min(const size_t x, const size_t y) { return x < y ? x : y; @@ -48,9 +49,9 @@ int dfs_read(const char *path, char *buf, size_t size, off_t offset, assert(fi); dfs_fh *fh = (dfs_fh*)fi->fh; + hdfsFS fs = hdfsConnGetFs(fh->conn); assert(fh != NULL); - assert(fh->fs != NULL); assert(fh->hdfsFH != NULL); // special case this as simplifies the rest of the logic to know the caller wanted > 0 bytes @@ -61,7 +62,7 @@ int dfs_read(const char *path, char *buf, size_t size, off_t offset, if ( size >= dfs->rdbuffer_size) { int num_read; size_t total_read = 0; - while (size - total_read > 0 && (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) { + while (size - total_read > 0 && (num_read = hdfsPread(fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) { total_read += num_read; } // if there was an error before satisfying the current read, this logic declares it an error @@ -98,7 +99,7 @@ int dfs_read(const char *path, char *buf, size_t size, off_t offset, size_t total_read = 0; while (dfs->rdbuffer_size - total_read > 0 && - (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) { + (num_read = hdfsPread(fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) { total_read += num_read; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c index f6fe48b6ad..326f573050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c @@ -24,25 +24,31 @@ int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) { - TRACE1("readdir", path) + int ret; + struct hdfsConn *conn = NULL; + hdfsFS fs; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + TRACE1("readdir", path) + assert(dfs); assert(path); assert(buf); - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } + fs = hdfsConnGetFs(conn); // Read dirents. Calling a variant that just returns the final path // component (HDFS-975) would save us from parsing it out below. int numEntries = 0; - hdfsFileInfo *info = hdfsListDirectory(userFS, path, &numEntries); + hdfsFileInfo *info = hdfsListDirectory(fs, path, &numEntries); - int ret = 0; // NULL means either the directory doesn't exist or maybe IO error. if (NULL == info) { ret = (errno > 0) ? -errno : -ENOENT; @@ -106,11 +112,11 @@ int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, } // free the info pointers hdfsFreeFileInfo(info,numEntries); + ret = 0; cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; - ERROR("Failed to disconnect %d", errno); + if (conn) { + hdfsConnRelease(conn); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c index e15dd572d6..0316de6141 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c @@ -52,15 +52,13 @@ int dfs_release (const char *path, struct fuse_file_info *fi) { assert(fh); hdfsFile file_handle = (hdfsFile)fh->hdfsFH; if (NULL != file_handle) { - if (hdfsCloseFile(fh->fs, file_handle) != 0) { + if (hdfsCloseFile(hdfsConnGetFs(fh->conn), file_handle) != 0) { ERROR("Could not close handle %ld for %s\n",(long)file_handle, path); ret = -EIO; } } free(fh->buf); - if (doDisconnect(fh->fs)) { - ret = -EIO; - } + hdfsConnRelease(fh->conn); pthread_mutex_destroy(&fh->mutex); free(fh); fi->fh = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c index bbb0462b03..415539f0ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c @@ -23,10 +23,12 @@ int dfs_rename(const char *from, const char *to) { - TRACE1("rename", from) - - // retrieve dfs specific data + struct hdfsConn *conn = NULL; + hdfsFS fs; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + int ret; + + TRACE1("rename", from) // check params and the context var assert(from); @@ -46,23 +48,24 @@ int dfs_rename(const char *from, const char *to) return -EACCES; } - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } - - int ret = 0; - if (hdfsRename(userFS, from, to)) { + fs = hdfsConnGetFs(conn); + if (hdfsRename(fs, from, to)) { ERROR("Rename %s to %s failed", from, to); ret = (errno > 0) ? -errno : -EIO; goto cleanup; } + ret = 0; cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c index 259040fe9f..f79562a8b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c @@ -25,9 +25,14 @@ extern const char *const TrashPrefixDir; int dfs_rmdir(const char *path) { - TRACE1("rmdir", path) - + struct hdfsConn *conn = NULL; + hdfsFS fs; + int ret; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + int numEntries = 0; + hdfsFileInfo *info = NULL; + + TRACE1("rmdir", path) assert(path); assert(dfs); @@ -35,42 +40,43 @@ int dfs_rmdir(const char *path) if (is_protected(path)) { ERROR("Trying to delete protected directory %s", path); - return -EACCES; + ret = -EACCES; + goto cleanup; } if (dfs->read_only) { ERROR("HDFS configured read-only, cannot delete directory %s", path); - return -EACCES; + ret = -EACCES; + goto cleanup; } - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } - - int ret = 0; - int numEntries = 0; - hdfsFileInfo *info = hdfsListDirectory(userFS,path,&numEntries); - - if (info) { - hdfsFreeFileInfo(info, numEntries); - } - + fs = hdfsConnGetFs(conn); + info = hdfsListDirectory(fs, path, &numEntries); if (numEntries) { ret = -ENOTEMPTY; goto cleanup; } - if (hdfsDeleteWithTrash(userFS, path, dfs->usetrash)) { + if (hdfsDeleteWithTrash(fs, path, dfs->usetrash)) { ERROR("Error trying to delete directory %s", path); ret = -EIO; goto cleanup; } + ret = 0; cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (info) { + hdfsFreeFileInfo(info, numEntries); + } + if (conn) { + hdfsConnRelease(conn); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c index c7004a9699..c9306a2127 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c @@ -23,9 +23,12 @@ int dfs_statfs(const char *path, struct statvfs *st) { - TRACE1("statfs",path) - + struct hdfsConn *conn = NULL; + hdfsFS fs; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + int ret; + + TRACE1("statfs",path) assert(path); assert(st); @@ -33,19 +36,18 @@ int dfs_statfs(const char *path, struct statvfs *st) memset(st,0,sizeof(struct statvfs)); - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } + fs = hdfsConnGetFs(conn); - const tOffset cap = hdfsGetCapacity(userFS); - const tOffset used = hdfsGetUsed(userFS); - const tOffset bsize = hdfsGetDefaultBlockSize(userFS); - - if (doDisconnect(userFS)) { - return -EIO; - } + const tOffset cap = hdfsGetCapacity(fs); + const tOffset used = hdfsGetUsed(fs); + const tOffset bsize = hdfsGetDefaultBlockSize(fs); st->f_bsize = bsize; st->f_frsize = bsize; @@ -58,6 +60,11 @@ int dfs_statfs(const char *path, struct statvfs *st) st->f_fsid = 1023; st->f_flag = ST_RDONLY | ST_NOSUID; st->f_namemax = 1023; + ret = 0; - return 0; +cleanup: + if (conn) { + hdfsConnRelease(conn); + } + return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c index d09b0c8fa6..bf72ca6bec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c @@ -28,10 +28,12 @@ */ int dfs_truncate(const char *path, off_t size) { - TRACE1("truncate", path) - + struct hdfsConn *conn = NULL; + hdfsFS fs; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + TRACE1("truncate", path) + assert(path); assert('/' == *path); assert(dfs); @@ -45,31 +47,33 @@ int dfs_truncate(const char *path, off_t size) return ret; } - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); ret = -EIO; goto cleanup; } + fs = hdfsConnGetFs(conn); int flags = O_WRONLY | O_CREAT; hdfsFile file; - if ((file = (hdfsFile)hdfsOpenFile(userFS, path, flags, 0, 0, 0)) == NULL) { + if ((file = (hdfsFile)hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) { ERROR("Could not connect open file %s", path); ret = -EIO; goto cleanup; } - if (hdfsCloseFile(userFS, file) != 0) { + if (hdfsCloseFile(fs, file) != 0) { ERROR("Could not close file %s", path); ret = -EIO; goto cleanup; } cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c index a3d2034dbb..102c2cd0f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c @@ -20,44 +20,51 @@ #include "fuse_impls.h" #include "fuse_connect.h" #include "fuse_trash.h" -extern const char *const TrashPrefixDir; int dfs_unlink(const char *path) { - TRACE1("unlink", path) - + struct hdfsConn *conn = NULL; + hdfsFS fs; int ret = 0; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + TRACE1("unlink", path) + assert(path); assert(dfs); assert('/' == *path); if (is_protected(path)) { ERROR("Trying to delete protected directory %s", path); - return -EACCES; + ret = -EACCES; + goto cleanup; } if (dfs->read_only) { ERROR("HDFS configured read-only, cannot create directory %s", path); - return -EACCES; + ret = -EACCES; + goto cleanup; } - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } + fs = hdfsConnGetFs(conn); - if (hdfsDeleteWithTrash(userFS, path, dfs->usetrash)) { + if (hdfsDeleteWithTrash(fs, path, dfs->usetrash)) { ERROR("Could not delete file %s", path); ret = (errno > 0) ? -errno : -EIO; goto cleanup; } + ret = 0; cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c index f9144f8451..dccff92c3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c @@ -22,10 +22,13 @@ int dfs_utimens(const char *path, const struct timespec ts[2]) { - TRACE1("utimens", path) + struct hdfsConn *conn = NULL; + hdfsFS fs; int ret = 0; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; + TRACE1("utimens", path) + assert(path); assert(dfs); assert('/' == *path); @@ -33,14 +36,17 @@ int dfs_utimens(const char *path, const struct timespec ts[2]) time_t aTime = ts[0].tv_sec; time_t mTime = ts[1].tv_sec; - hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port); - if (userFS == NULL) { - ERROR("Could not connect"); - return -EIO; + ret = fuseConnectAsThreadUid(&conn); + if (ret) { + fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs " + "connection! error %d.\n", ret); + ret = -EIO; + goto cleanup; } + fs = hdfsConnGetFs(conn); - if (hdfsUtime(userFS, path, mTime, aTime)) { - hdfsFileInfo *info = hdfsGetPathInfo(userFS, path); + if (hdfsUtime(fs, path, mTime, aTime)) { + hdfsFileInfo *info = hdfsGetPathInfo(fs, path); if (info == NULL) { ret = (errno > 0) ? -errno : -ENOENT; goto cleanup; @@ -54,10 +60,11 @@ int dfs_utimens(const char *path, const struct timespec ts[2]) } goto cleanup; } + ret = 0; cleanup: - if (doDisconnect(userFS)) { - ret = -EIO; + if (conn) { + hdfsConnRelease(conn); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c index 8bb0454888..3090e9e32b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c @@ -16,6 +16,7 @@ * limitations under the License. */ +#include "fuse_connect.h" #include "fuse_dfs.h" #include "fuse_impls.h" #include "fuse_file_handle.h" @@ -48,15 +49,15 @@ int dfs_write(const char *path, const char *buf, size_t size, pthread_mutex_lock(&fh->mutex); tSize length = 0; - assert(fh->fs); + hdfsFS fs = hdfsConnGetFs(fh->conn); - tOffset cur_offset = hdfsTell(fh->fs, file_handle); + tOffset cur_offset = hdfsTell(fs, file_handle); if (cur_offset != offset) { ERROR("User trying to random access write to a file %d != %d for %s", (int)cur_offset, (int)offset, path); ret = -ENOTSUP; } else { - length = hdfsWrite(fh->fs, file_handle, buf, size); + length = hdfsWrite(fs, file_handle, buf, size); if (length <= 0) { ERROR("Could not write all bytes for %s %d != %d (errno=%d)", path, length, (int)size, errno); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c index 6c1c0d058a..aeb5f386a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c @@ -78,8 +78,20 @@ void init_protectedpaths(dfs_context *dfs) { dfs->protectedpaths[j] = NULL; } +static void dfsPrintOptions(FILE *fp, const struct options *o) +{ + fprintf(fp, "[ protected=%s, nn_uri=%s, nn_port=%d, " + "debug=%d, read_only=%d, initchecks=%d, " + "no_permissions=%d, usetrash=%d, entry_timeout=%d, " + "attribute_timeout=%d, rdbuffer_size=%Zd, direct_io=%d ]", + (o->protected ? o->protected : "(NULL)"), o->nn_uri, o->nn_port, + o->debug, o->read_only, o->initchecks, + o->no_permissions, o->usetrash, o->entry_timeout, + o->attribute_timeout, o->rdbuffer_size, o->direct_io); +} -void *dfs_init(void) { +void *dfs_init(void) +{ // // Create a private struct of data we will pass to fuse here and which // will then be accessible on every call. @@ -92,15 +104,15 @@ void *dfs_init(void) { // initialize the context dfs->debug = options.debug; - dfs->nn_uri = options.nn_uri; - dfs->nn_port = options.nn_port; dfs->read_only = options.read_only; dfs->usetrash = options.usetrash; dfs->protectedpaths = NULL; dfs->rdbuffer_size = options.rdbuffer_size; dfs->direct_io = options.direct_io; - INFO("Mounting. nn_uri=%s, nn_port=%d", dfs->nn_uri, dfs->nn_port); + fprintf(stderr, "Mounting with options "); + dfsPrintOptions(stderr, &options); + fprintf(stderr, "\n"); init_protectedpaths(dfs); assert(dfs->protectedpaths != NULL); @@ -109,12 +121,6 @@ void *dfs_init(void) { DEBUG("dfs->rdbuffersize <= 0 = %ld", dfs->rdbuffer_size); dfs->rdbuffer_size = 32768; } - - if (0 != allocFsTable()) { - ERROR("FATAL: could not allocate "); - exit(1); - } - return (void*)dfs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index eba2bf1b66..70468f80c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -225,7 +225,7 @@ done: * * @return 0 on success; error code otherwise */ -static int hadoopConfSet(JNIEnv *env, jobject jConfiguration, +static int hadoopConfSetStr(JNIEnv *env, jobject jConfiguration, const char *key, const char *value) { int ret; @@ -283,7 +283,7 @@ static int jStrToCstr(JNIEnv *env, jstring jstr, char **cstr) return 0; } -static int hadoopConfGet(JNIEnv *env, jobject jConfiguration, +static int hadoopConfGetStr(JNIEnv *env, jobject jConfiguration, const char *key, char **val) { int ret; @@ -301,7 +301,7 @@ static int hadoopConfGet(JNIEnv *env, jobject jConfiguration, HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING)), jkey); if (ret) { - snprintf(buf, sizeof(buf), "hadoopConfGet(%s)", key); + snprintf(buf, sizeof(buf), "hadoopConfGetStr(%s)", key); ret = errnoFromException(jExc, env, buf); goto done; } @@ -321,7 +321,7 @@ done: return ret; } -int hdfsConfGet(const char *key, char **val) +int hdfsConfGetStr(const char *key, char **val) { JNIEnv *env; int ret; @@ -339,19 +339,67 @@ int hdfsConfGet(const char *key, char **val) ret = EINTERNAL; goto done; } - ret = hadoopConfGet(env, jConfiguration, key, val); - if (ret) - goto done; - ret = 0; + ret = hadoopConfGetStr(env, jConfiguration, key, val); done: - if (jConfiguration) - destroyLocalReference(env, jConfiguration); + destroyLocalReference(env, jConfiguration); if (ret) errno = ret; return ret; } -void hdfsConfFree(char *val) +static int hadoopConfGetInt(JNIEnv *env, jobject jConfiguration, + const char *key, int32_t *val) +{ + int ret; + jthrowable jExc = NULL; + jvalue jVal; + jstring jkey = NULL; + char buf[1024]; + + jkey = (*env)->NewStringUTF(env, key); + if (!jkey) { + (*env)->ExceptionDescribe(env); + return ENOMEM; + } + ret = invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, + HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"), + jkey, (jint)(*val)); + destroyLocalReference(env, jkey); + if (ret) { + snprintf(buf, sizeof(buf), "hadoopConfGetInt(%s)", key); + return errnoFromException(jExc, env, buf); + } + *val = jVal.i; + return 0; +} + +int hdfsConfGetInt(const char *key, int32_t *val) +{ + JNIEnv *env; + int ret; + jobject jConfiguration = NULL; + + env = getJNIEnv(); + if (env == NULL) { + ret = EINTERNAL; + goto done; + } + jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); + if (jConfiguration == NULL) { + fprintf(stderr, "Can't construct instance of class " + "org.apache.hadoop.conf.Configuration\n"); + ret = EINTERNAL; + goto done; + } + ret = hadoopConfGetInt(env, jConfiguration, key, val); +done: + destroyLocalReference(env, jConfiguration); + if (ret) + errno = ret; + return ret; +} + +void hdfsConfStrFree(char *val) { free(val); } @@ -583,7 +631,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) } if (bld->kerbTicketCachePath) { - ret = hadoopConfSet(env, jConfiguration, + ret = hadoopConfSetStr(env, jConfiguration, KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath); if (ret) goto done; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index 5c3c6dfe2d..4f252cd8d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -220,21 +220,33 @@ extern "C" { * Get a configuration string. * * @param key The key to find - * @param val (out param) The value. This will be NULL if the + * @param val (out param) The value. This will be set to NULL if the * key isn't found. You must free this string with - * hdfsConfFree. + * hdfsConfStrFree. * * @return 0 on success; nonzero error code otherwise. * Failure to find the key is not an error. */ - int hdfsConfGet(const char *key, char **val); + int hdfsConfGetStr(const char *key, char **val); /** - * Free a configuration string found with hdfsConfGet. + * Get a configuration integer. * - * @param val A configuration string obtained from hdfsConfGet + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. */ - void hdfsConfFree(char *val); + int hdfsConfGetInt(const char *key, int32_t *val); + + /** + * Free a configuration string found with hdfsConfGetStr. + * + * @param val A configuration string obtained from hdfsConfGetStr + */ + void hdfsConfStrFree(char *val); /** * hdfsDisconnect - Disconnect from the hdfs file system. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c index 97f92942f9..d788f6282e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c @@ -412,6 +412,7 @@ char *classNameOfObject(jobject jobj, JNIEnv *env) { return newstr; } + /** * Get the global JNI environemnt. * @@ -500,6 +501,11 @@ static JNIEnv* getGlobalJNIEnv(void) "with error: %d\n", rv); return NULL; } + if (invokeMethod(env, NULL, NULL, STATIC, NULL, + "org/apache/hadoop/fs/FileSystem", + "loadFileSystems", "()V") != 0) { + (*env)->ExceptionDescribe(env); + } } else { //Attach this thread to the VM diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/tree.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/tree.h new file mode 100644 index 0000000000..ac78ee30af --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/tree.h @@ -0,0 +1,765 @@ +/* $NetBSD: tree.h,v 1.8 2004/03/28 19:38:30 provos Exp $ */ +/* $OpenBSD: tree.h,v 1.7 2002/10/17 21:51:54 art Exp $ */ +/* $FreeBSD: src/sys/sys/tree.h,v 1.9.4.1 2011/09/23 00:51:37 kensmith Exp $ */ + +/*- + * Copyright 2002 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SYS_TREE_H_ +#define _SYS_TREE_H_ + +#include + +/* + * This file defines data structures for different types of trees: + * splay trees and red-black trees. + * + * A splay tree is a self-organizing data structure. Every operation + * on the tree causes a splay to happen. The splay moves the requested + * node to the root of the tree and partly rebalances it. + * + * This has the benefit that request locality causes faster lookups as + * the requested nodes move to the top of the tree. On the other hand, + * every lookup causes memory writes. + * + * The Balance Theorem bounds the total access time for m operations + * and n inserts on an initially empty tree as O((m + n)lg n). The + * amortized cost for a sequence of m accesses to a splay tree is O(lg n); + * + * A red-black tree is a binary search tree with the node color as an + * extra attribute. It fulfills a set of conditions: + * - every search path from the root to a leaf consists of the + * same number of black nodes, + * - each red node (except for the root) has a black parent, + * - each leaf node is black. + * + * Every operation on a red-black tree is bounded as O(lg n). + * The maximum height of a red-black tree is 2lg (n+1). + */ + +#define SPLAY_HEAD(name, type) \ +struct name { \ + struct type *sph_root; /* root of the tree */ \ +} + +#define SPLAY_INITIALIZER(root) \ + { NULL } + +#define SPLAY_INIT(root) do { \ + (root)->sph_root = NULL; \ +} while (/*CONSTCOND*/ 0) + +#define SPLAY_ENTRY(type) \ +struct { \ + struct type *spe_left; /* left element */ \ + struct type *spe_right; /* right element */ \ +} + +#define SPLAY_LEFT(elm, field) (elm)->field.spe_left +#define SPLAY_RIGHT(elm, field) (elm)->field.spe_right +#define SPLAY_ROOT(head) (head)->sph_root +#define SPLAY_EMPTY(head) (SPLAY_ROOT(head) == NULL) + +/* SPLAY_ROTATE_{LEFT,RIGHT} expect that tmp hold SPLAY_{RIGHT,LEFT} */ +#define SPLAY_ROTATE_RIGHT(head, tmp, field) do { \ + SPLAY_LEFT((head)->sph_root, field) = SPLAY_RIGHT(tmp, field); \ + SPLAY_RIGHT(tmp, field) = (head)->sph_root; \ + (head)->sph_root = tmp; \ +} while (/*CONSTCOND*/ 0) + +#define SPLAY_ROTATE_LEFT(head, tmp, field) do { \ + SPLAY_RIGHT((head)->sph_root, field) = SPLAY_LEFT(tmp, field); \ + SPLAY_LEFT(tmp, field) = (head)->sph_root; \ + (head)->sph_root = tmp; \ +} while (/*CONSTCOND*/ 0) + +#define SPLAY_LINKLEFT(head, tmp, field) do { \ + SPLAY_LEFT(tmp, field) = (head)->sph_root; \ + tmp = (head)->sph_root; \ + (head)->sph_root = SPLAY_LEFT((head)->sph_root, field); \ +} while (/*CONSTCOND*/ 0) + +#define SPLAY_LINKRIGHT(head, tmp, field) do { \ + SPLAY_RIGHT(tmp, field) = (head)->sph_root; \ + tmp = (head)->sph_root; \ + (head)->sph_root = SPLAY_RIGHT((head)->sph_root, field); \ +} while (/*CONSTCOND*/ 0) + +#define SPLAY_ASSEMBLE(head, node, left, right, field) do { \ + SPLAY_RIGHT(left, field) = SPLAY_LEFT((head)->sph_root, field); \ + SPLAY_LEFT(right, field) = SPLAY_RIGHT((head)->sph_root, field);\ + SPLAY_LEFT((head)->sph_root, field) = SPLAY_RIGHT(node, field); \ + SPLAY_RIGHT((head)->sph_root, field) = SPLAY_LEFT(node, field); \ +} while (/*CONSTCOND*/ 0) + +/* Generates prototypes and inline functions */ + +#define SPLAY_PROTOTYPE(name, type, field, cmp) \ +void name##_SPLAY(struct name *, struct type *); \ +void name##_SPLAY_MINMAX(struct name *, int); \ +struct type *name##_SPLAY_INSERT(struct name *, struct type *); \ +struct type *name##_SPLAY_REMOVE(struct name *, struct type *); \ + \ +/* Finds the node with the same key as elm */ \ +static __inline struct type * \ +name##_SPLAY_FIND(struct name *head, struct type *elm) \ +{ \ + if (SPLAY_EMPTY(head)) \ + return(NULL); \ + name##_SPLAY(head, elm); \ + if ((cmp)(elm, (head)->sph_root) == 0) \ + return (head->sph_root); \ + return (NULL); \ +} \ + \ +static __inline struct type * \ +name##_SPLAY_NEXT(struct name *head, struct type *elm) \ +{ \ + name##_SPLAY(head, elm); \ + if (SPLAY_RIGHT(elm, field) != NULL) { \ + elm = SPLAY_RIGHT(elm, field); \ + while (SPLAY_LEFT(elm, field) != NULL) { \ + elm = SPLAY_LEFT(elm, field); \ + } \ + } else \ + elm = NULL; \ + return (elm); \ +} \ + \ +static __inline struct type * \ +name##_SPLAY_MIN_MAX(struct name *head, int val) \ +{ \ + name##_SPLAY_MINMAX(head, val); \ + return (SPLAY_ROOT(head)); \ +} + +/* Main splay operation. + * Moves node close to the key of elm to top + */ +#define SPLAY_GENERATE(name, type, field, cmp) \ +struct type * \ +name##_SPLAY_INSERT(struct name *head, struct type *elm) \ +{ \ + if (SPLAY_EMPTY(head)) { \ + SPLAY_LEFT(elm, field) = SPLAY_RIGHT(elm, field) = NULL; \ + } else { \ + int __comp; \ + name##_SPLAY(head, elm); \ + __comp = (cmp)(elm, (head)->sph_root); \ + if(__comp < 0) { \ + SPLAY_LEFT(elm, field) = SPLAY_LEFT((head)->sph_root, field);\ + SPLAY_RIGHT(elm, field) = (head)->sph_root; \ + SPLAY_LEFT((head)->sph_root, field) = NULL; \ + } else if (__comp > 0) { \ + SPLAY_RIGHT(elm, field) = SPLAY_RIGHT((head)->sph_root, field);\ + SPLAY_LEFT(elm, field) = (head)->sph_root; \ + SPLAY_RIGHT((head)->sph_root, field) = NULL; \ + } else \ + return ((head)->sph_root); \ + } \ + (head)->sph_root = (elm); \ + return (NULL); \ +} \ + \ +struct type * \ +name##_SPLAY_REMOVE(struct name *head, struct type *elm) \ +{ \ + struct type *__tmp; \ + if (SPLAY_EMPTY(head)) \ + return (NULL); \ + name##_SPLAY(head, elm); \ + if ((cmp)(elm, (head)->sph_root) == 0) { \ + if (SPLAY_LEFT((head)->sph_root, field) == NULL) { \ + (head)->sph_root = SPLAY_RIGHT((head)->sph_root, field);\ + } else { \ + __tmp = SPLAY_RIGHT((head)->sph_root, field); \ + (head)->sph_root = SPLAY_LEFT((head)->sph_root, field);\ + name##_SPLAY(head, elm); \ + SPLAY_RIGHT((head)->sph_root, field) = __tmp; \ + } \ + return (elm); \ + } \ + return (NULL); \ +} \ + \ +void \ +name##_SPLAY(struct name *head, struct type *elm) \ +{ \ + struct type __node, *__left, *__right, *__tmp; \ + int __comp; \ +\ + SPLAY_LEFT(&__node, field) = SPLAY_RIGHT(&__node, field) = NULL;\ + __left = __right = &__node; \ +\ + while ((__comp = (cmp)(elm, (head)->sph_root)) != 0) { \ + if (__comp < 0) { \ + __tmp = SPLAY_LEFT((head)->sph_root, field); \ + if (__tmp == NULL) \ + break; \ + if ((cmp)(elm, __tmp) < 0){ \ + SPLAY_ROTATE_RIGHT(head, __tmp, field); \ + if (SPLAY_LEFT((head)->sph_root, field) == NULL)\ + break; \ + } \ + SPLAY_LINKLEFT(head, __right, field); \ + } else if (__comp > 0) { \ + __tmp = SPLAY_RIGHT((head)->sph_root, field); \ + if (__tmp == NULL) \ + break; \ + if ((cmp)(elm, __tmp) > 0){ \ + SPLAY_ROTATE_LEFT(head, __tmp, field); \ + if (SPLAY_RIGHT((head)->sph_root, field) == NULL)\ + break; \ + } \ + SPLAY_LINKRIGHT(head, __left, field); \ + } \ + } \ + SPLAY_ASSEMBLE(head, &__node, __left, __right, field); \ +} \ + \ +/* Splay with either the minimum or the maximum element \ + * Used to find minimum or maximum element in tree. \ + */ \ +void name##_SPLAY_MINMAX(struct name *head, int __comp) \ +{ \ + struct type __node, *__left, *__right, *__tmp; \ +\ + SPLAY_LEFT(&__node, field) = SPLAY_RIGHT(&__node, field) = NULL;\ + __left = __right = &__node; \ +\ + while (1) { \ + if (__comp < 0) { \ + __tmp = SPLAY_LEFT((head)->sph_root, field); \ + if (__tmp == NULL) \ + break; \ + if (__comp < 0){ \ + SPLAY_ROTATE_RIGHT(head, __tmp, field); \ + if (SPLAY_LEFT((head)->sph_root, field) == NULL)\ + break; \ + } \ + SPLAY_LINKLEFT(head, __right, field); \ + } else if (__comp > 0) { \ + __tmp = SPLAY_RIGHT((head)->sph_root, field); \ + if (__tmp == NULL) \ + break; \ + if (__comp > 0) { \ + SPLAY_ROTATE_LEFT(head, __tmp, field); \ + if (SPLAY_RIGHT((head)->sph_root, field) == NULL)\ + break; \ + } \ + SPLAY_LINKRIGHT(head, __left, field); \ + } \ + } \ + SPLAY_ASSEMBLE(head, &__node, __left, __right, field); \ +} + +#define SPLAY_NEGINF -1 +#define SPLAY_INF 1 + +#define SPLAY_INSERT(name, x, y) name##_SPLAY_INSERT(x, y) +#define SPLAY_REMOVE(name, x, y) name##_SPLAY_REMOVE(x, y) +#define SPLAY_FIND(name, x, y) name##_SPLAY_FIND(x, y) +#define SPLAY_NEXT(name, x, y) name##_SPLAY_NEXT(x, y) +#define SPLAY_MIN(name, x) (SPLAY_EMPTY(x) ? NULL \ + : name##_SPLAY_MIN_MAX(x, SPLAY_NEGINF)) +#define SPLAY_MAX(name, x) (SPLAY_EMPTY(x) ? NULL \ + : name##_SPLAY_MIN_MAX(x, SPLAY_INF)) + +#define SPLAY_FOREACH(x, name, head) \ + for ((x) = SPLAY_MIN(name, head); \ + (x) != NULL; \ + (x) = SPLAY_NEXT(name, head, x)) + +/* Macros that define a red-black tree */ +#define RB_HEAD(name, type) \ +struct name { \ + struct type *rbh_root; /* root of the tree */ \ +} + +#define RB_INITIALIZER(root) \ + { NULL } + +#define RB_INIT(root) do { \ + (root)->rbh_root = NULL; \ +} while (/*CONSTCOND*/ 0) + +#define RB_BLACK 0 +#define RB_RED 1 +#define RB_ENTRY(type) \ +struct { \ + struct type *rbe_left; /* left element */ \ + struct type *rbe_right; /* right element */ \ + struct type *rbe_parent; /* parent element */ \ + int rbe_color; /* node color */ \ +} + +#define RB_LEFT(elm, field) (elm)->field.rbe_left +#define RB_RIGHT(elm, field) (elm)->field.rbe_right +#define RB_PARENT(elm, field) (elm)->field.rbe_parent +#define RB_COLOR(elm, field) (elm)->field.rbe_color +#define RB_ROOT(head) (head)->rbh_root +#define RB_EMPTY(head) (RB_ROOT(head) == NULL) + +#define RB_SET(elm, parent, field) do { \ + RB_PARENT(elm, field) = parent; \ + RB_LEFT(elm, field) = RB_RIGHT(elm, field) = NULL; \ + RB_COLOR(elm, field) = RB_RED; \ +} while (/*CONSTCOND*/ 0) + +#define RB_SET_BLACKRED(black, red, field) do { \ + RB_COLOR(black, field) = RB_BLACK; \ + RB_COLOR(red, field) = RB_RED; \ +} while (/*CONSTCOND*/ 0) + +#ifndef RB_AUGMENT +#define RB_AUGMENT(x) do {} while (0) +#endif + +#define RB_ROTATE_LEFT(head, elm, tmp, field) do { \ + (tmp) = RB_RIGHT(elm, field); \ + if ((RB_RIGHT(elm, field) = RB_LEFT(tmp, field)) != NULL) { \ + RB_PARENT(RB_LEFT(tmp, field), field) = (elm); \ + } \ + RB_AUGMENT(elm); \ + if ((RB_PARENT(tmp, field) = RB_PARENT(elm, field)) != NULL) { \ + if ((elm) == RB_LEFT(RB_PARENT(elm, field), field)) \ + RB_LEFT(RB_PARENT(elm, field), field) = (tmp); \ + else \ + RB_RIGHT(RB_PARENT(elm, field), field) = (tmp); \ + } else \ + (head)->rbh_root = (tmp); \ + RB_LEFT(tmp, field) = (elm); \ + RB_PARENT(elm, field) = (tmp); \ + RB_AUGMENT(tmp); \ + if ((RB_PARENT(tmp, field))) \ + RB_AUGMENT(RB_PARENT(tmp, field)); \ +} while (/*CONSTCOND*/ 0) + +#define RB_ROTATE_RIGHT(head, elm, tmp, field) do { \ + (tmp) = RB_LEFT(elm, field); \ + if ((RB_LEFT(elm, field) = RB_RIGHT(tmp, field)) != NULL) { \ + RB_PARENT(RB_RIGHT(tmp, field), field) = (elm); \ + } \ + RB_AUGMENT(elm); \ + if ((RB_PARENT(tmp, field) = RB_PARENT(elm, field)) != NULL) { \ + if ((elm) == RB_LEFT(RB_PARENT(elm, field), field)) \ + RB_LEFT(RB_PARENT(elm, field), field) = (tmp); \ + else \ + RB_RIGHT(RB_PARENT(elm, field), field) = (tmp); \ + } else \ + (head)->rbh_root = (tmp); \ + RB_RIGHT(tmp, field) = (elm); \ + RB_PARENT(elm, field) = (tmp); \ + RB_AUGMENT(tmp); \ + if ((RB_PARENT(tmp, field))) \ + RB_AUGMENT(RB_PARENT(tmp, field)); \ +} while (/*CONSTCOND*/ 0) + +/* Generates prototypes and inline functions */ +#define RB_PROTOTYPE(name, type, field, cmp) \ + RB_PROTOTYPE_INTERNAL(name, type, field, cmp,) +#define RB_PROTOTYPE_STATIC(name, type, field, cmp) \ + RB_PROTOTYPE_INTERNAL(name, type, field, cmp, __unused static) +#define RB_PROTOTYPE_INTERNAL(name, type, field, cmp, attr) \ +attr void name##_RB_INSERT_COLOR(struct name *, struct type *); \ +attr void name##_RB_REMOVE_COLOR(struct name *, struct type *, struct type *);\ +attr struct type *name##_RB_REMOVE(struct name *, struct type *); \ +attr struct type *name##_RB_INSERT(struct name *, struct type *); \ +attr struct type *name##_RB_FIND(struct name *, struct type *); \ +attr struct type *name##_RB_NFIND(struct name *, struct type *); \ +attr struct type *name##_RB_NEXT(struct type *); \ +attr struct type *name##_RB_PREV(struct type *); \ +attr struct type *name##_RB_MINMAX(struct name *, int); \ + \ + +/* Main rb operation. + * Moves node close to the key of elm to top + */ +#define RB_GENERATE(name, type, field, cmp) \ + RB_GENERATE_INTERNAL(name, type, field, cmp,) +#define RB_GENERATE_STATIC(name, type, field, cmp) \ + RB_GENERATE_INTERNAL(name, type, field, cmp, __unused static) +#define RB_GENERATE_INTERNAL(name, type, field, cmp, attr) \ +attr void \ +name##_RB_INSERT_COLOR(struct name *head, struct type *elm) \ +{ \ + struct type *parent, *gparent, *tmp; \ + while ((parent = RB_PARENT(elm, field)) != NULL && \ + RB_COLOR(parent, field) == RB_RED) { \ + gparent = RB_PARENT(parent, field); \ + if (parent == RB_LEFT(gparent, field)) { \ + tmp = RB_RIGHT(gparent, field); \ + if (tmp && RB_COLOR(tmp, field) == RB_RED) { \ + RB_COLOR(tmp, field) = RB_BLACK; \ + RB_SET_BLACKRED(parent, gparent, field);\ + elm = gparent; \ + continue; \ + } \ + if (RB_RIGHT(parent, field) == elm) { \ + RB_ROTATE_LEFT(head, parent, tmp, field);\ + tmp = parent; \ + parent = elm; \ + elm = tmp; \ + } \ + RB_SET_BLACKRED(parent, gparent, field); \ + RB_ROTATE_RIGHT(head, gparent, tmp, field); \ + } else { \ + tmp = RB_LEFT(gparent, field); \ + if (tmp && RB_COLOR(tmp, field) == RB_RED) { \ + RB_COLOR(tmp, field) = RB_BLACK; \ + RB_SET_BLACKRED(parent, gparent, field);\ + elm = gparent; \ + continue; \ + } \ + if (RB_LEFT(parent, field) == elm) { \ + RB_ROTATE_RIGHT(head, parent, tmp, field);\ + tmp = parent; \ + parent = elm; \ + elm = tmp; \ + } \ + RB_SET_BLACKRED(parent, gparent, field); \ + RB_ROTATE_LEFT(head, gparent, tmp, field); \ + } \ + } \ + RB_COLOR(head->rbh_root, field) = RB_BLACK; \ +} \ + \ +attr void \ +name##_RB_REMOVE_COLOR(struct name *head, struct type *parent, struct type *elm) \ +{ \ + struct type *tmp; \ + while ((elm == NULL || RB_COLOR(elm, field) == RB_BLACK) && \ + elm != RB_ROOT(head)) { \ + if (RB_LEFT(parent, field) == elm) { \ + tmp = RB_RIGHT(parent, field); \ + if (RB_COLOR(tmp, field) == RB_RED) { \ + RB_SET_BLACKRED(tmp, parent, field); \ + RB_ROTATE_LEFT(head, parent, tmp, field);\ + tmp = RB_RIGHT(parent, field); \ + } \ + if ((RB_LEFT(tmp, field) == NULL || \ + RB_COLOR(RB_LEFT(tmp, field), field) == RB_BLACK) &&\ + (RB_RIGHT(tmp, field) == NULL || \ + RB_COLOR(RB_RIGHT(tmp, field), field) == RB_BLACK)) {\ + RB_COLOR(tmp, field) = RB_RED; \ + elm = parent; \ + parent = RB_PARENT(elm, field); \ + } else { \ + if (RB_RIGHT(tmp, field) == NULL || \ + RB_COLOR(RB_RIGHT(tmp, field), field) == RB_BLACK) {\ + struct type *oleft; \ + if ((oleft = RB_LEFT(tmp, field)) \ + != NULL) \ + RB_COLOR(oleft, field) = RB_BLACK;\ + RB_COLOR(tmp, field) = RB_RED; \ + RB_ROTATE_RIGHT(head, tmp, oleft, field);\ + tmp = RB_RIGHT(parent, field); \ + } \ + RB_COLOR(tmp, field) = RB_COLOR(parent, field);\ + RB_COLOR(parent, field) = RB_BLACK; \ + if (RB_RIGHT(tmp, field)) \ + RB_COLOR(RB_RIGHT(tmp, field), field) = RB_BLACK;\ + RB_ROTATE_LEFT(head, parent, tmp, field);\ + elm = RB_ROOT(head); \ + break; \ + } \ + } else { \ + tmp = RB_LEFT(parent, field); \ + if (RB_COLOR(tmp, field) == RB_RED) { \ + RB_SET_BLACKRED(tmp, parent, field); \ + RB_ROTATE_RIGHT(head, parent, tmp, field);\ + tmp = RB_LEFT(parent, field); \ + } \ + if ((RB_LEFT(tmp, field) == NULL || \ + RB_COLOR(RB_LEFT(tmp, field), field) == RB_BLACK) &&\ + (RB_RIGHT(tmp, field) == NULL || \ + RB_COLOR(RB_RIGHT(tmp, field), field) == RB_BLACK)) {\ + RB_COLOR(tmp, field) = RB_RED; \ + elm = parent; \ + parent = RB_PARENT(elm, field); \ + } else { \ + if (RB_LEFT(tmp, field) == NULL || \ + RB_COLOR(RB_LEFT(tmp, field), field) == RB_BLACK) {\ + struct type *oright; \ + if ((oright = RB_RIGHT(tmp, field)) \ + != NULL) \ + RB_COLOR(oright, field) = RB_BLACK;\ + RB_COLOR(tmp, field) = RB_RED; \ + RB_ROTATE_LEFT(head, tmp, oright, field);\ + tmp = RB_LEFT(parent, field); \ + } \ + RB_COLOR(tmp, field) = RB_COLOR(parent, field);\ + RB_COLOR(parent, field) = RB_BLACK; \ + if (RB_LEFT(tmp, field)) \ + RB_COLOR(RB_LEFT(tmp, field), field) = RB_BLACK;\ + RB_ROTATE_RIGHT(head, parent, tmp, field);\ + elm = RB_ROOT(head); \ + break; \ + } \ + } \ + } \ + if (elm) \ + RB_COLOR(elm, field) = RB_BLACK; \ +} \ + \ +attr struct type * \ +name##_RB_REMOVE(struct name *head, struct type *elm) \ +{ \ + struct type *child, *parent, *old = elm; \ + int color; \ + if (RB_LEFT(elm, field) == NULL) \ + child = RB_RIGHT(elm, field); \ + else if (RB_RIGHT(elm, field) == NULL) \ + child = RB_LEFT(elm, field); \ + else { \ + struct type *left; \ + elm = RB_RIGHT(elm, field); \ + while ((left = RB_LEFT(elm, field)) != NULL) \ + elm = left; \ + child = RB_RIGHT(elm, field); \ + parent = RB_PARENT(elm, field); \ + color = RB_COLOR(elm, field); \ + if (child) \ + RB_PARENT(child, field) = parent; \ + if (parent) { \ + if (RB_LEFT(parent, field) == elm) \ + RB_LEFT(parent, field) = child; \ + else \ + RB_RIGHT(parent, field) = child; \ + RB_AUGMENT(parent); \ + } else \ + RB_ROOT(head) = child; \ + if (RB_PARENT(elm, field) == old) \ + parent = elm; \ + (elm)->field = (old)->field; \ + if (RB_PARENT(old, field)) { \ + if (RB_LEFT(RB_PARENT(old, field), field) == old)\ + RB_LEFT(RB_PARENT(old, field), field) = elm;\ + else \ + RB_RIGHT(RB_PARENT(old, field), field) = elm;\ + RB_AUGMENT(RB_PARENT(old, field)); \ + } else \ + RB_ROOT(head) = elm; \ + RB_PARENT(RB_LEFT(old, field), field) = elm; \ + if (RB_RIGHT(old, field)) \ + RB_PARENT(RB_RIGHT(old, field), field) = elm; \ + if (parent) { \ + left = parent; \ + do { \ + RB_AUGMENT(left); \ + } while ((left = RB_PARENT(left, field)) != NULL); \ + } \ + goto color; \ + } \ + parent = RB_PARENT(elm, field); \ + color = RB_COLOR(elm, field); \ + if (child) \ + RB_PARENT(child, field) = parent; \ + if (parent) { \ + if (RB_LEFT(parent, field) == elm) \ + RB_LEFT(parent, field) = child; \ + else \ + RB_RIGHT(parent, field) = child; \ + RB_AUGMENT(parent); \ + } else \ + RB_ROOT(head) = child; \ +color: \ + if (color == RB_BLACK) \ + name##_RB_REMOVE_COLOR(head, parent, child); \ + return (old); \ +} \ + \ +/* Inserts a node into the RB tree */ \ +attr struct type * \ +name##_RB_INSERT(struct name *head, struct type *elm) \ +{ \ + struct type *tmp; \ + struct type *parent = NULL; \ + int comp = 0; \ + tmp = RB_ROOT(head); \ + while (tmp) { \ + parent = tmp; \ + comp = (cmp)(elm, parent); \ + if (comp < 0) \ + tmp = RB_LEFT(tmp, field); \ + else if (comp > 0) \ + tmp = RB_RIGHT(tmp, field); \ + else \ + return (tmp); \ + } \ + RB_SET(elm, parent, field); \ + if (parent != NULL) { \ + if (comp < 0) \ + RB_LEFT(parent, field) = elm; \ + else \ + RB_RIGHT(parent, field) = elm; \ + RB_AUGMENT(parent); \ + } else \ + RB_ROOT(head) = elm; \ + name##_RB_INSERT_COLOR(head, elm); \ + return (NULL); \ +} \ + \ +/* Finds the node with the same key as elm */ \ +attr struct type * \ +name##_RB_FIND(struct name *head, struct type *elm) \ +{ \ + struct type *tmp = RB_ROOT(head); \ + int comp; \ + while (tmp) { \ + comp = cmp(elm, tmp); \ + if (comp < 0) \ + tmp = RB_LEFT(tmp, field); \ + else if (comp > 0) \ + tmp = RB_RIGHT(tmp, field); \ + else \ + return (tmp); \ + } \ + return (NULL); \ +} \ + \ +/* Finds the first node greater than or equal to the search key */ \ +attr struct type * \ +name##_RB_NFIND(struct name *head, struct type *elm) \ +{ \ + struct type *tmp = RB_ROOT(head); \ + struct type *res = NULL; \ + int comp; \ + while (tmp) { \ + comp = cmp(elm, tmp); \ + if (comp < 0) { \ + res = tmp; \ + tmp = RB_LEFT(tmp, field); \ + } \ + else if (comp > 0) \ + tmp = RB_RIGHT(tmp, field); \ + else \ + return (tmp); \ + } \ + return (res); \ +} \ + \ +/* ARGSUSED */ \ +attr struct type * \ +name##_RB_NEXT(struct type *elm) \ +{ \ + if (RB_RIGHT(elm, field)) { \ + elm = RB_RIGHT(elm, field); \ + while (RB_LEFT(elm, field)) \ + elm = RB_LEFT(elm, field); \ + } else { \ + if (RB_PARENT(elm, field) && \ + (elm == RB_LEFT(RB_PARENT(elm, field), field))) \ + elm = RB_PARENT(elm, field); \ + else { \ + while (RB_PARENT(elm, field) && \ + (elm == RB_RIGHT(RB_PARENT(elm, field), field)))\ + elm = RB_PARENT(elm, field); \ + elm = RB_PARENT(elm, field); \ + } \ + } \ + return (elm); \ +} \ + \ +/* ARGSUSED */ \ +attr struct type * \ +name##_RB_PREV(struct type *elm) \ +{ \ + if (RB_LEFT(elm, field)) { \ + elm = RB_LEFT(elm, field); \ + while (RB_RIGHT(elm, field)) \ + elm = RB_RIGHT(elm, field); \ + } else { \ + if (RB_PARENT(elm, field) && \ + (elm == RB_RIGHT(RB_PARENT(elm, field), field))) \ + elm = RB_PARENT(elm, field); \ + else { \ + while (RB_PARENT(elm, field) && \ + (elm == RB_LEFT(RB_PARENT(elm, field), field)))\ + elm = RB_PARENT(elm, field); \ + elm = RB_PARENT(elm, field); \ + } \ + } \ + return (elm); \ +} \ + \ +attr struct type * \ +name##_RB_MINMAX(struct name *head, int val) \ +{ \ + struct type *tmp = RB_ROOT(head); \ + struct type *parent = NULL; \ + while (tmp) { \ + parent = tmp; \ + if (val < 0) \ + tmp = RB_LEFT(tmp, field); \ + else \ + tmp = RB_RIGHT(tmp, field); \ + } \ + return (parent); \ +} + +#define RB_NEGINF -1 +#define RB_INF 1 + +#define RB_INSERT(name, x, y) name##_RB_INSERT(x, y) +#define RB_REMOVE(name, x, y) name##_RB_REMOVE(x, y) +#define RB_FIND(name, x, y) name##_RB_FIND(x, y) +#define RB_NFIND(name, x, y) name##_RB_NFIND(x, y) +#define RB_NEXT(name, x, y) name##_RB_NEXT(y) +#define RB_PREV(name, x, y) name##_RB_PREV(y) +#define RB_MIN(name, x) name##_RB_MINMAX(x, RB_NEGINF) +#define RB_MAX(name, x) name##_RB_MINMAX(x, RB_INF) + +#define RB_FOREACH(x, name, head) \ + for ((x) = RB_MIN(name, head); \ + (x) != NULL; \ + (x) = name##_RB_NEXT(x)) + +#define RB_FOREACH_FROM(x, name, y) \ + for ((x) = (y); \ + ((x) != NULL) && ((y) = name##_RB_NEXT(x), (x) != NULL); \ + (x) = (y)) + +#define RB_FOREACH_SAFE(x, name, head, y) \ + for ((x) = RB_MIN(name, head); \ + ((x) != NULL) && ((y) = name##_RB_NEXT(x), (x) != NULL); \ + (x) = (y)) + +#define RB_FOREACH_REVERSE(x, name, head) \ + for ((x) = RB_MAX(name, head); \ + (x) != NULL; \ + (x) = name##_RB_PREV(x)) + +#define RB_FOREACH_REVERSE_FROM(x, name, y) \ + for ((x) = (y); \ + ((x) != NULL) && ((y) = name##_RB_PREV(x), (x) != NULL); \ + (x) = (y)) + +#define RB_FOREACH_REVERSE_SAFE(x, name, head, y) \ + for ((x) = RB_MAX(name, head); \ + ((x) != NULL) && ((y) = name##_RB_PREV(x), (x) != NULL); \ + (x) = (y)) + +#endif /* _SYS_TREE_H_ */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 27f3b9e522..00c5e8bc44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -909,4 +909,25 @@ + + hadoop.fuse.connection.timeout + 300 + + The minimum number of seconds that we'll cache libhdfs connection objects + in fuse_dfs. Lower values will result in lower memory consumption; higher + values may speed up access by avoiding the overhead of creating new + connection objects. + + + + + hadoop.fuse.timer.period + 5 + + The number of seconds between cache expiry checks in fuse_dfs. Lower values + will result in fuse_dfs noticing changes to Kerberos ticket caches more + quickly. + + +