diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index eb9563e859..ba1984afa4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -622,6 +622,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4315. DNs with multiple BPs can have BPOfferServices fail to start due to unsynchronized map access. (atm) + HDFS-4140. fuse-dfs handles open(O_TRUNC) poorly. (Colin Patrick McCabe + via atm) + BREAKDOWN OF HDFS-3077 SUBTASKS HDFS-3077. Quorum-based protocol for reading and writing edit logs. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c index 2a39d85263..218c5c9ab2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c @@ -131,7 +131,6 @@ static enum authConf discoverAuthConf(void) int fuseConnectInit(const char *nnUri, int port) { - const char *timerPeriod; int ret; gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c index ecd772f63f..9ca2650ddd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c @@ -24,12 +24,77 @@ #include #include +static int get_hdfs_open_flags_from_info(hdfsFS fs, const char *path, + int flags, int *outflags, const hdfsFileInfo *info); + +/** + * Given a set of FUSE flags, determine the libhdfs flags we need. + * + * This is complicated by two things: + * 1. libhdfs doesn't support O_RDWR at all; + * 2. when given O_WRONLY, libhdfs will truncate the file unless O_APPEND is + * also given. In other words, there is an implicit O_TRUNC. + * + * Probably the next iteration of the libhdfs interface should not use the POSIX + * flags at all, since, as you can see, they don't really match up very closely + * to the POSIX meaning. However, for the time being, this is the API. + * + * @param fs The libhdfs object + * @param path The path we're opening + * @param flags The FUSE flags + * + * @return negative error code on failure; flags otherwise. + */ +static int64_t get_hdfs_open_flags(hdfsFS fs, const char *path, int flags) +{ + int hasContent; + int64_t ret; + hdfsFileInfo *info; + + if ((flags & O_ACCMODE) == O_RDONLY) { + return O_RDONLY; + } + if (flags & O_TRUNC) { + /* If we're opening for write or read/write, O_TRUNC means we should blow + * away the file which is there and create our own file. + * */ + return O_WRONLY; + } + info = hdfsGetPathInfo(fs, path); + if (info) { + if (info->mSize == 0) { + // If the file has zero length, we shouldn't feel bad about blowing it + // away. + ret = O_WRONLY; + } else if ((flags & O_ACCMODE) == O_RDWR) { + // HACK: translate O_RDWR requests into O_RDONLY if the file already + // exists and has non-zero length. + ret = O_RDONLY; + } else { // O_WRONLY + // HACK: translate O_WRONLY requests into append if the file already + // exists. + ret = O_WRONLY | O_APPEND; + } + } else { // !info + if (flags & O_CREAT) { + ret = O_WRONLY; + } else { + ret = -ENOENT; + } + } + if (info) { + hdfsFreeFileInfo(info, 1); + } + return ret; +} + int dfs_open(const char *path, struct fuse_file_info *fi) { hdfsFS fs = NULL; dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; dfs_fh *fh = NULL; - int mutexInit = 0, ret; + int mutexInit = 0, ret, flags = 0; + int64_t flagRet; TRACE1("open", path) @@ -38,10 +103,6 @@ int dfs_open(const char *path, struct fuse_file_info *fi) assert('/' == *path); assert(dfs); - // 0x8000 is always passed in and hadoop doesn't like it, so killing it here - // bugbug figure out what this flag is and report problem to Hadoop JIRA - int flags = (fi->flags & 0x7FFF); - // retrieve dfs specific data fh = (dfs_fh*)calloc(1, sizeof (dfs_fh)); if (!fh) { @@ -57,22 +118,12 @@ int dfs_open(const char *path, struct fuse_file_info *fi) goto error; } fs = hdfsConnGetFs(fh->conn); - - if (flags & O_RDWR) { - hdfsFileInfo *info = hdfsGetPathInfo(fs, path); - if (info == NULL) { - // File does not exist (maybe?); interpret it as a O_WRONLY - // If the actual error was something else, we'll get it again when - // we try to open the file. - flags ^= O_RDWR; - flags |= O_WRONLY; - } else { - // File exists; open this as read only. - flags ^= O_RDWR; - flags |= O_RDONLY; - } + flagRet = get_hdfs_open_flags(fs, path, fi->flags); + if (flagRet < 0) { + ret = -flagRet; + goto error; } - + flags = flagRet; if ((fh->hdfsFH = hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) { ERROR("Could not open file %s (errno=%d)", path, errno); if (errno == 0 || errno == EINTERNAL) { @@ -91,7 +142,7 @@ int dfs_open(const char *path, struct fuse_file_info *fi) } mutexInit = 1; - if (fi->flags & O_WRONLY || fi->flags & O_CREAT) { + if ((flags & O_ACCMODE) == O_WRONLY) { fh->buf = NULL; } else { assert(dfs->rdbuffer_size > 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c index 1ec11c1a0b..4da6da0fa9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c @@ -98,7 +98,7 @@ static void dfsPrintOptions(FILE *fp, const struct options *o) o->attribute_timeout, o->rdbuffer_size, o->direct_io); } -void *dfs_init(void) +void *dfs_init(struct fuse_conn_info *conn) { int ret; @@ -143,6 +143,45 @@ void *dfs_init(void) exit(EXIT_FAILURE); } } + +#ifdef FUSE_CAP_ATOMIC_O_TRUNC + // If FUSE_CAP_ATOMIC_O_TRUNC is set, open("foo", O_CREAT | O_TRUNC) will + // result in dfs_open being called with O_TRUNC. + // + // If this capability is not present, fuse will try to use multiple + // operation to "simulate" open(O_TRUNC). This doesn't work very well with + // HDFS. + // Unfortunately, this capability is only implemented on Linux 2.6.29 or so. + // See HDFS-4140 for details. + if (conn->capable & FUSE_CAP_ATOMIC_O_TRUNC) { + conn->want |= FUSE_CAP_ATOMIC_O_TRUNC; + } +#endif + +#ifdef FUSE_CAP_ASYNC_READ + // We're OK with doing reads at the same time as writes. + if (conn->capable & FUSE_CAP_ASYNC_READ) { + conn->want |= FUSE_CAP_ASYNC_READ; + } +#endif + +#ifdef FUSE_CAP_BIG_WRITES + // Yes, we can read more than 4kb at a time. In fact, please do! + if (conn->capable & FUSE_CAP_BIG_WRITES) { + conn->want |= FUSE_CAP_BIG_WRITES; + } +#endif + +#ifdef FUSE_CAP_DONT_MASK + if ((options.no_permissions) && (conn->capable & FUSE_CAP_DONT_MASK)) { + // If we're handing permissions ourselves, we don't want the kernel + // applying its own umask. HDFS already implements its own per-user + // umasks! Sadly, this only actually does something on kernels 2.6.31 and + // later. + conn->want |= FUSE_CAP_DONT_MASK; + } +#endif + return (void*)dfs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.h index 6f17af8af2..681ab912da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.h @@ -19,13 +19,15 @@ #ifndef __FUSE_INIT_H__ #define __FUSE_INIT_H__ +struct fuse_conn_info; + /** * These are responsible for initializing connections to dfs and internal * data structures and then freeing them. * i.e., what happens on mount and unmount. * */ -void *dfs_init(); +void *dfs_init(struct fuse_conn_info *conn); void dfs_destroy (void *ptr); #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c index 9252ead1bb..78fdbc66f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c @@ -16,6 +16,8 @@ * limitations under the License. */ +#define FUSE_USE_VERSION 26 + #include "fuse-dfs/test/fuse_workload.h" #include "libhdfs/expect.h" #include "util/posix_util.h" @@ -23,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -138,13 +141,89 @@ static int safeRead(int fd, void *buf, int c) return amt; } +/* Bug: HDFS-2551. + * When a program writes a file, closes it, and immediately re-opens it, + * it might not appear to have the correct length. This is because FUSE + * invokes the release() callback asynchronously. + * + * To work around this, we keep retrying until the file length is what we + * expect. + */ +static int closeWorkaroundHdfs2551(int fd, const char *path, off_t expectedSize) +{ + int ret, try; + struct stat stBuf; + + RETRY_ON_EINTR_GET_ERRNO(ret, close(fd)); + EXPECT_ZERO(ret); + for (try = 0; try < MAX_TRIES; try++) { + EXPECT_ZERO(stat(path, &stBuf)); + EXPECT_NONZERO(S_ISREG(stBuf.st_mode)); + if (stBuf.st_size == expectedSize) { + return 0; + } + sleepNoSig(1); + } + fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length " + "%lld; instead, it had length %lld\n", + path, (long long)expectedSize, (long long)stBuf.st_size); + return -EIO; +} + +#ifdef FUSE_CAP_ATOMIC_O_TRUNC + +/** + * Test that we can create a file, write some contents to it, close that file, + * and then successfully re-open with O_TRUNC. + */ +static int testOpenTrunc(const char *base) +{ + int fd, err; + char path[PATH_MAX]; + const char * const SAMPLE1 = "this is the first file that we wrote."; + const char * const SAMPLE2 = "this is the second file that we wrote. " + "It's #2!"; + + snprintf(path, sizeof(path), "%s/trunc.txt", base); + fd = open(path, O_CREAT | O_TRUNC | O_WRONLY, 0644); + if (fd < 0) { + err = errno; + fprintf(stderr, "TEST_ERROR: testOpenTrunc(%s): first open " + "failed with error %d\n", path, err); + return -err; + } + EXPECT_ZERO(safeWrite(fd, SAMPLE1, strlen(SAMPLE1))); + EXPECT_ZERO(closeWorkaroundHdfs2551(fd, path, strlen(SAMPLE1))); + fd = open(path, O_CREAT | O_TRUNC | O_WRONLY, 0644); + if (fd < 0) { + err = errno; + fprintf(stderr, "TEST_ERROR: testOpenTrunc(%s): second open " + "failed with error %d\n", path, err); + return -err; + } + EXPECT_ZERO(safeWrite(fd, SAMPLE2, strlen(SAMPLE2))); + EXPECT_ZERO(closeWorkaroundHdfs2551(fd, path, strlen(SAMPLE2))); + return 0; +} + +#else + +static int testOpenTrunc(const char *base) +{ + fprintf(stderr, "FUSE_WORKLOAD: We lack FUSE_CAP_ATOMIC_O_TRUNC support. " + "Not testing open(O_TRUNC).\n"); + return 0; +} + +#endif + int runFuseWorkloadImpl(const char *root, const char *pcomp, struct fileCtx *ctx) { char base[PATH_MAX], tmp[PATH_MAX], *tmpBuf; char src[PATH_MAX], dst[PATH_MAX]; struct stat stBuf; - int ret, i, try; + int ret, i; struct utimbuf tbuf; struct statvfs stvBuf; @@ -241,34 +320,9 @@ int runFuseWorkloadImpl(const char *root, const char *pcomp, EXPECT_ZERO(safeWrite(ctx[i].fd, ctx[i].str, ctx[i].strLen)); } for (i = 0; i < NUM_FILE_CTX; i++) { - RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd)); - EXPECT_ZERO(ret); + EXPECT_ZERO(closeWorkaroundHdfs2551(ctx[i].fd, ctx[i].path, ctx[i].strLen)); ctx[i].fd = -1; } - for (i = 0; i < NUM_FILE_CTX; i++) { - /* Bug: HDFS-2551. - * When a program writes a file, closes it, and immediately re-opens it, - * it might not appear to have the correct length. This is because FUSE - * invokes the release() callback asynchronously. - * - * To work around this, we keep retrying until the file length is what we - * expect. - */ - for (try = 0; try < MAX_TRIES; try++) { - EXPECT_ZERO(stat(ctx[i].path, &stBuf)); - EXPECT_NONZERO(S_ISREG(stBuf.st_mode)); - if (ctx[i].strLen == stBuf.st_size) { - break; - } - sleepNoSig(1); - } - if (try == MAX_TRIES) { - fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length " - "%d; instead, it had length %lld\n", - ctx[i].path, ctx[i].strLen, (long long)stBuf.st_size); - return -EIO; - } - } for (i = 0; i < NUM_FILE_CTX; i++) { ctx[i].fd = open(ctx[i].path, O_RDONLY); if (ctx[i].fd < 0) { @@ -308,6 +362,7 @@ int runFuseWorkloadImpl(const char *root, const char *pcomp, for (i = 0; i < NUM_FILE_CTX; i++) { free(ctx[i].path); } + EXPECT_ZERO(testOpenTrunc(base)); EXPECT_ZERO(recursiveDelete(base)); return 0; }