diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4a86a202f7..2e834582a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -319,6 +319,9 @@ Branch-2 ( Unreleased changes ) HDFS-3612. Single namenode image directory config warning can be improved. (Andy Isaacson via harsh) + HDFS-3606. libhdfs: create self-contained unit test. + (Colin Patrick McCabe via eli) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 41bcf31baf..02d9857fe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -422,18 +422,23 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt index 63193ce84e..d3c93f0e5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt @@ -132,4 +132,28 @@ target_link_libraries(hdfs_write ) output_directory(hdfs_write target/usr/local/bin) +add_library(native_mini_dfs + main/native/native_mini_dfs.c +) +target_link_libraries(native_mini_dfs + hdfs +) + +add_executable(test_native_mini_dfs + main/native/test_native_mini_dfs.c +) +target_link_libraries(test_native_mini_dfs + native_mini_dfs +) + +add_executable(test_libhdfs_threaded + main/native/test_libhdfs_threaded.c +) +target_link_libraries(test_libhdfs_threaded + hdfs + native_mini_dfs + pthread +) + add_subdirectory(contrib/fuse-dfs/src) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/expect.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/expect.h new file mode 100644 index 0000000000..2046bd01e6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/expect.h @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H +#define LIBHDFS_NATIVE_TESTS_EXPECT_H + +#include + +#define EXPECT_ZERO(x) \ + do { \ + int __my_ret__ = x; \ + if (__my_ret__) { \ + int __my_errno__ = errno; \ + fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + "code %d (errno: %d): got nonzero from %s\n", \ + __LINE__, __my_ret__, __my_errno__, #x); \ + return __my_ret__; \ + } \ + } while (0); + +#define EXPECT_NULL(x) \ + do { \ + void* __my_ret__ = x; \ + int __my_errno__ = errno; \ + if (__my_ret__ != NULL) { \ + fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \ + "got non-NULL value %p from %s\n", \ + __LINE__, __my_errno__, __my_ret__, #x); \ + return -1; \ + } \ + } while (0); + +#define EXPECT_NONNULL(x) \ + do { \ + void* __my_ret__ = x; \ + int __my_errno__ = errno; \ + if (__my_ret__ == NULL) { \ + fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \ + "got NULL from %s\n", __LINE__, __my_errno__, #x); \ + return -1; \ + } \ + } while (0); + +#define EXPECT_NEGATIVE_ONE_WITH_ERRNO(x, e) \ + do { \ + int __my_ret__ = x; \ + int __my_errno__ = errno; \ + if (__my_ret__ != -1) { \ + fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + "code %d (errno: %d): expected -1 from %s\n", __LINE__, \ + __my_ret__, __my_errno__, #x); \ + return -1; \ + } \ + if (__my_errno__ != e) { \ + fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + "code %d (errno: %d): expected errno = %d from %s\n", \ + __LINE__, __my_ret__, __my_errno__, e, #x); \ + return -1; \ + } \ + } while (0); + +#define EXPECT_NONZERO(x) \ + do { \ + int __my_ret__ = x; \ + int __my_errno__ = errno; \ + if (__my_ret__) { \ + fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + "code %d (errno: %d): got zero from %s\n", __LINE__, \ + __my_ret__, __my_errno__, #x); \ + return -1; \ + } \ + } while (0); + +#define EXPECT_NONNEGATIVE(x) \ + do { \ + int __my_ret__ = x; \ + int __my_errno__ = errno; \ + if (__my_ret__ < 0) { \ + fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + "code %d (errno: %d): got negative return from %s\n", \ + __LINE__, __my_ret__, __my_errno__, #x); \ + return __my_ret__; \ + } \ + } while (0); + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/native_mini_dfs.c new file mode 100644 index 0000000000..63139e620b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/native_mini_dfs.c @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfsJniHelper.h" +#include "native_mini_dfs.h" + +#include +#include +#include +#include + +#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder" +#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster" +#define HADOOP_CONF "org/apache/hadoop/conf/Configuration" + +struct NativeMiniDfsCluster { + /** + * The NativeMiniDfsCluster object + */ + jobject obj; +}; + +struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) +{ + struct NativeMiniDfsCluster* cl = NULL; + jobject bld = NULL, bld2 = NULL, cobj = NULL; + jvalue val; + JNIEnv *env = getJNIEnv(); + if (!env) { + fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n"); + goto error; + } + cl = calloc(1, sizeof(struct NativeMiniDfsCluster)); + if (!cl) { + fprintf(stderr, "nmdCreate: OOM"); + goto error; + } + cobj = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); + if (!cobj) { + fprintf(stderr, "nmdCreate: unable to construct Configuration\n"); + goto error_free_cl; + } + bld = constructNewObjectOfClass(env, NULL, MINIDFS_CLUSTER_BUILDER, + "(L"HADOOP_CONF";)V", cobj); + if (!bld) { + fprintf(stderr, "nmdCreate: unable to construct " + "NativeMiniDfsCluster#Builder\n"); + goto error_dlr_cobj; + } + if (invokeMethod(env, &val, NULL, INSTANCE, bld, + MINIDFS_CLUSTER_BUILDER, "format", + "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat)) { + fprintf(stderr, "nmdCreate: failed to call Builder#doFormat\n"); + goto error_dlr_bld; + } + bld2 = val.l; + if (invokeMethod(env, &val, NULL, INSTANCE, bld, + MINIDFS_CLUSTER_BUILDER, "build", + "()L" MINIDFS_CLUSTER ";")) { + fprintf(stderr, "nmdCreate: failed to call Builder#build\n"); + goto error_dlr_bld2; + } + cl->obj = (*env)->NewGlobalRef(env, val.l); + if (!cl->obj) { + fprintf(stderr, "nmdCreate: failed to create global reference to " + "MiniDFSCluster\n"); + goto error_dlr_val; + } + (*env)->DeleteLocalRef(env, val.l); + (*env)->DeleteLocalRef(env, bld2); + (*env)->DeleteLocalRef(env, bld); + (*env)->DeleteLocalRef(env, cobj); + return cl; + +error_dlr_val: + (*env)->DeleteLocalRef(env, val.l); +error_dlr_bld2: + (*env)->DeleteLocalRef(env, bld2); +error_dlr_bld: + (*env)->DeleteLocalRef(env, bld); +error_dlr_cobj: + (*env)->DeleteLocalRef(env, cobj); +error_free_cl: + free(cl); +error: + return NULL; +} + +void nmdFree(struct NativeMiniDfsCluster* cl) +{ + JNIEnv *env = getJNIEnv(); + if (!env) { + fprintf(stderr, "nmdFree: getJNIEnv failed\n"); + free(cl); + return; + } + (*env)->DeleteGlobalRef(env, cl->obj); + free(cl); +} + +int nmdShutdown(struct NativeMiniDfsCluster* cl) +{ + JNIEnv *env = getJNIEnv(); + if (!env) { + fprintf(stderr, "nmdShutdown: getJNIEnv failed\n"); + return -EIO; + } + if (invokeMethod(env, NULL, NULL, INSTANCE, cl->obj, + MINIDFS_CLUSTER, "shutdown", "()V")) { + fprintf(stderr, "nmdShutdown: MiniDFSCluster#shutdown failure\n"); + return -EIO; + } + return 0; +} + +int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl) +{ + JNIEnv *env = getJNIEnv(); + if (!env) { + fprintf(stderr, "nmdWaitClusterUp: getJNIEnv failed\n"); + return -EIO; + } + if (invokeMethod(env, NULL, NULL, INSTANCE, cl->obj, + MINIDFS_CLUSTER, "waitClusterUp", "()V")) { + fprintf(stderr, "nmdWaitClusterUp: MiniDFSCluster#waitClusterUp " + "failure\n"); + return -EIO; + } + return 0; +} + +int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl) +{ + JNIEnv *env = getJNIEnv(); + jvalue jVal; + + if (!env) { + fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n"); + return -EIO; + } + // Note: this will have to be updated when HA nativeMiniDfs clusters are + // supported + if (invokeMethod(env, &jVal, NULL, INSTANCE, cl->obj, + MINIDFS_CLUSTER, "getNameNodePort", "()I")) { + fprintf(stderr, "nmdHdfsConnect: MiniDFSCluster#getNameNodePort " + "failure\n"); + return -EIO; + } + return jVal.i; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/native_mini_dfs.h new file mode 100644 index 0000000000..88a4b47beb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/native_mini_dfs.h @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFS_NATIVE_MINI_DFS_H +#define LIBHDFS_NATIVE_MINI_DFS_H + +#include /* for jboolean */ + +struct NativeMiniDfsCluster; + +/** + * Represents a configuration to use for creating a Native MiniDFSCluster + */ +struct NativeMiniDfsConf { + /** + * Nonzero if the cluster should be formatted prior to startup + */ + jboolean doFormat; +}; + +/** + * Create a NativeMiniDfsBuilder + * + * @param conf (inout) The cluster configuration + * + * @return a NativeMiniDfsBuilder, or a NULL pointer on error. + */ +struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf); + +/** + * Wait until a MiniDFSCluster comes out of safe mode. + * + * @param cl The cluster + * + * @return 0 on success; a non-zero error code if the cluster fails to + * come out of safe mode. + */ +int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl); + +/** + * Shut down a NativeMiniDFS cluster + * + * @param cl The cluster + * + * @return 0 on success; a non-zero error code if an exception is + * thrown. + */ +int nmdShutdown(struct NativeMiniDfsCluster *cl); + +/** + * Destroy a Native MiniDFSCluster + * + * @param cl The cluster to destroy + */ +void nmdFree(struct NativeMiniDfsCluster* cl); + +/** + * Get the port that's in use by the given (non-HA) nativeMiniDfs + * + * @param cl The initialized NativeMiniDfsCluster + * + * @return the port, or a negative error code + */ +int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl); + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/test_libhdfs_threaded.c new file mode 100644 index 0000000000..5c404266ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/test_libhdfs_threaded.c @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs.h" +#include "native_mini_dfs.h" + +#include +#include +#include +#include +#include + +#define TLH_MAX_THREADS 100 + +static sem_t tlhSem; + +static struct NativeMiniDfsCluster* tlhCluster; + +struct tlhThreadInfo { + /** Thread index */ + int threadIdx; + /** 0 = thread was successful; error code otherwise */ + int success; + /** pthread identifier */ + pthread_t thread; +}; + +static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs) +{ + int ret, port; + hdfsFS hdfs; + + port = nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort " + "returned error %d\n", port); + return port; + } + hdfs = hdfsConnectNewInstance("localhost", port); + if (!hdfs) { + ret = -errno; + return ret; + } + *fs = hdfs; + return 0; +} + +static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) +{ + char prefix[256], tmp[256]; + hdfsFile file; + int ret, expected; + + snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx); + + if (hdfsExists(fs, prefix) == 0) { + EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); + } + EXPECT_ZERO(hdfsCreateDirectory(fs, prefix)); + snprintf(tmp, sizeof(tmp), "%s/file", prefix); + + /* There should not be any file to open for reading. */ + EXPECT_NULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0)); + + file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0); + EXPECT_NONNULL(file); + + /* TODO: implement writeFully and use it here */ + expected = strlen(prefix); + ret = hdfsWrite(fs, file, prefix, expected); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret); + return ret; + } + if (ret != expected) { + fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but " + "it wrote %d\n", ret, expected); + return EIO; + } + EXPECT_ZERO(hdfsFlush(fs, file)); + EXPECT_ZERO(hdfsCloseFile(fs, file)); + + /* Let's re-open the file for reading */ + file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + + /* TODO: implement readFully and use it here */ + ret = hdfsRead(fs, file, tmp, sizeof(tmp)); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsRead failed and set errno %d\n", ret); + return ret; + } + if (ret != expected) { + fprintf(stderr, "hdfsRead was supposed to read %d bytes, but " + "it read %d\n", ret, expected); + return EIO; + } + EXPECT_ZERO(memcmp(prefix, tmp, expected)); + EXPECT_ZERO(hdfsCloseFile(fs, file)); + + // TODO: Non-recursive delete should fail? + //EXPECT_NONZERO(hdfsDelete(fs, prefix, 0)); + + EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); + return 0; +} + +static void *testHdfsOperations(void *v) +{ + struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v; + hdfsFS fs = NULL; + int ret; + + fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", + ti->threadIdx); + ret = hdfsSingleNameNodeConnect(tlhCluster, &fs); + if (ret) { + fprintf(stderr, "testHdfsOperations(threadIdx=%d): " + "hdfsSingleNameNodeConnect failed with error %d.\n", + ti->threadIdx, ret); + ti->success = EIO; + return NULL; + } + ti->success = doTestHdfsOperations(ti, fs); + if (hdfsDisconnect(fs)) { + ret = errno; + fprintf(stderr, "hdfsDisconnect error %d\n", ret); + ti->success = ret; + } + return NULL; +} + +static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads) +{ + int i, threadsFailed = 0; + const char *sep = ""; + + for (i = 0; i < tlhNumThreads; i++) { + if (ti[i].success != 0) { + threadsFailed = 1; + } + } + if (!threadsFailed) { + fprintf(stderr, "testLibHdfs: all threads succeeded. SUCCESS.\n"); + return EXIT_SUCCESS; + } + fprintf(stderr, "testLibHdfs: some threads failed: ["); + for (i = 0; i < tlhNumThreads; i++) { + if (ti[i].success != 0) { + fprintf(stderr, "%s%d", sep, i); + sep = ", "; + } + } + fprintf(stderr, "]. FAILURE.\n"); + return EXIT_FAILURE; +} + +/** + * Test that we can write a file with libhdfs and then read it back + */ +int main(void) +{ + int i, tlhNumThreads; + const char *tlhNumThreadsStr; + struct tlhThreadInfo ti[TLH_MAX_THREADS]; + struct NativeMiniDfsConf conf = { + .doFormat = 1, + }; + + tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); + if (!tlhNumThreadsStr) { + tlhNumThreadsStr = "3"; + } + tlhNumThreads = atoi(tlhNumThreadsStr); + if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) { + fprintf(stderr, "testLibHdfs: must have a number of threads " + "between 1 and %d inclusive, not %d\n", + TLH_MAX_THREADS, tlhNumThreads); + return EXIT_FAILURE; + } + memset(&ti[0], 0, sizeof(ti)); + for (i = 0; i < tlhNumThreads; i++) { + ti[i].threadIdx = i; + } + + EXPECT_ZERO(sem_init(&tlhSem, 0, tlhNumThreads)); + tlhCluster = nmdCreate(&conf); + EXPECT_NONNULL(tlhCluster); + EXPECT_ZERO(nmdWaitClusterUp(tlhCluster)); + + for (i = 0; i < tlhNumThreads; i++) { + EXPECT_ZERO(pthread_create(&ti[i].thread, NULL, + testHdfsOperations, &ti[i])); + } + for (i = 0; i < tlhNumThreads; i++) { + EXPECT_ZERO(pthread_join(ti[i].thread, NULL)); + } + + EXPECT_ZERO(nmdShutdown(tlhCluster)); + nmdFree(tlhCluster); + EXPECT_ZERO(sem_destroy(&tlhSem)); + return checkFailures(ti, tlhNumThreads); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/test_native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/test_native_mini_dfs.c new file mode 100644 index 0000000000..b97ef953ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/test_native_mini_dfs.c @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "native_mini_dfs.h" + +#include + +static struct NativeMiniDfsConf conf = { + .doFormat = 1, +}; + +/** + * Test that we can create a MiniDFSCluster and shut it down. + */ +int main(void) { + struct NativeMiniDfsCluster* cl; + + cl = nmdCreate(&conf); + EXPECT_NONNULL(cl); + EXPECT_ZERO(nmdWaitClusterUp(cl)); + EXPECT_ZERO(nmdShutdown(cl)); + nmdFree(cl); + + return 0; +}