diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c index b36ef76e62..6938109d53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c @@ -182,6 +182,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) } (*env)->DeleteLocalRef(env, val.l); } + if (conf->numDataNodes) { + jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, + "numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " + "Builder::numDataNodes"); + goto error; + } + } + (*env)->DeleteLocalRef(env, val.l); jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "build", "()L" MINIDFS_CLUSTER ";"); if (jthr) { @@ -291,7 +301,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, jthrowable jthr; int ret = 0; const char *host; - + if (!env) { fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n"); return -EIO; @@ -306,7 +316,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, return -EIO; } jNameNode = jVal.l; - + // Then get the http address (InetSocketAddress) of the NameNode jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE, "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";"); @@ -317,7 +327,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, goto error_dlr_nn; } jAddress = jVal.l; - + jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, "getPort", "()I"); if (jthr) { @@ -327,7 +337,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, goto error_dlr_addr; } *port = jVal.i; - + jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;"); if (jthr) { @@ -339,12 +349,12 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, host = (*env)->GetStringUTFChars(env, jVal.l, NULL); *hostName = strdup(host); (*env)->ReleaseStringUTFChars(env, jVal.l, host); - + error_dlr_addr: (*env)->DeleteLocalRef(env, jAddress); error_dlr_nn: (*env)->DeleteLocalRef(env, jNameNode); - + return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h index ce8b1cfdab..628180f087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h @@ -26,7 +26,7 @@ extern "C" { #endif struct hdfsBuilder; -struct NativeMiniDfsCluster; +struct NativeMiniDfsCluster; /** * Represents a configuration to use for creating a Native MiniDFSCluster @@ -51,6 +51,11 @@ struct NativeMiniDfsConf { * Nonzero if we should configure short circuit. */ jboolean configureShortCircuit; + + /** + * The number of datanodes in MiniDfsCluster + */ + jint numDataNodes; }; /** @@ -96,13 +101,13 @@ void nmdFree(struct NativeMiniDfsCluster* cl); * * @return the port, or a negative error code */ -int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); +int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); /** * Get the http address that's in use by the given (non-HA) nativeMiniDfs * * @param cl The initialized NativeMiniDfsCluster - * @param port Used to capture the http port of the NameNode + * @param port Used to capture the http port of the NameNode * of the NativeMiniDfsCluster * @param hostName Used to capture the http hostname of the NameNode * of the NativeMiniDfsCluster diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c new file mode 100644 index 0000000000..0d01e447c3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs/hdfs.h" +#include "hdfspp/hdfs_ext.h" +#include "native_mini_dfs.h" +#include "os/thread.h" + +#include +#include +#include +#include +#include +#include + +#define TO_STR_HELPER(X) #X +#define TO_STR(X) TO_STR_HELPER(X) + +#define TLH_MAX_THREADS 10000 + +#define TLH_MAX_DNS 16 + +#define TLH_DEFAULT_BLOCK_SIZE 1048576 + +#define TLH_DEFAULT_DFS_REPLICATION 3 + +#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100 + +#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5 + +#ifndef RANDOM_ERROR_RATIO +#define RANDOM_ERROR_RATIO 1000000000 +#endif + +struct tlhThreadInfo { + /** Thread index */ + int threadIdx; + /** 0 = thread was successful; error code otherwise */ + int success; + /** thread identifier */ + thread theThread; + /** fs, shared with other threads **/ + hdfsFS hdfs; + /** Filename */ + const char *fileNm; + +}; + +static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs, + const char *username) +{ + int ret; + tPort port; + hdfsFS hdfs; + struct hdfsBuilder *bld; + + port = (tPort)nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort " + "returned error %d\n", port); + return port; + } + bld = hdfsNewBuilder(); + if (!bld) + return -ENOMEM; + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderSetNameNode(bld, "localhost"); + hdfsBuilderSetNameNodePort(bld, port); + hdfsBuilderConfSetStr(bld, "dfs.block.size", + TO_STR(TLH_DEFAULT_BLOCK_SIZE)); + hdfsBuilderConfSetStr(bld, "dfs.blocksize", + TO_STR(TLH_DEFAULT_BLOCK_SIZE)); + hdfsBuilderConfSetStr(bld, "dfs.replication", + TO_STR(TLH_DEFAULT_DFS_REPLICATION)); + hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries", + TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES)); + hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval", + TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS)); + if (username) { + hdfsBuilderSetUserName(bld, username); + } + hdfs = hdfsBuilderConnect(bld); + if (!hdfs) { + ret = -errno; + return ret; + } + *fs = hdfs; + return 0; +} + +static int hdfsWriteData(hdfsFS hdfs, const char *dirNm, + const char *fileNm, tSize fileSz) +{ + hdfsFile file; + int ret, expected; + const char *content; + + content = fileNm; + + if (hdfsExists(hdfs, dirNm) == 0) { + EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1)); + } + EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm)); + + file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0); + EXPECT_NONNULL(file); + + expected = (int)strlen(content); + tSize sz = 0; + while (sz < fileSz) { + ret = hdfsWrite(hdfs, file, content, expected); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret); + return ret; + } + if (ret != expected) { + fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but " + "it wrote %d\n", ret, expected); + return EIO; + } + sz += ret; + } + EXPECT_ZERO(hdfsFlush(hdfs, file)); + EXPECT_ZERO(hdfsHSync(hdfs, file)); + EXPECT_ZERO(hdfsCloseFile(hdfs, file)); + return 0; +} + +static int fileEventCallback1(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie) +{ + char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO"); + int64_t randomErrRatio = RANDOM_ERROR_RATIO; + if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr); + if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR; + else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK; + return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK; +} + +static int fileEventCallback2(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie) +{ + /* no op */ + return LIBHDFSPP_EVENT_OK; +} + +static int doTestHdfsMiniStress(struct tlhThreadInfo *ti, int randomErr) +{ + char tmp[4096]; + hdfsFile file; + int ret, expected; + hdfsFileInfo *fileInfo; + uint64_t readOps, nErrs=0; + tOffset seekPos; + const char *content; + + content = ti->fileNm; + expected = (int)strlen(content); + + fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm); + EXPECT_NONNULL(fileInfo); + + file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + + libhdfspp_file_event_callback callback = (randomErr != 0) ? &fileEventCallback1 : &fileEventCallback2; + + hdfsPreAttachFileMonitor(callback, 0); + + fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n", + ti->threadIdx); + for (readOps=0; readOps < 1000; ++readOps) { + EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file)); + file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected); + seekPos = (seekPos / expected) * expected; + ret = hdfsSeek(ti->hdfs, file, seekPos); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set" + " errno %d\n", seekPos, ret); + ++nErrs; + continue; + } + ret = hdfsRead(ti->hdfs, file, tmp, expected); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsRead failed and set errno %d\n", ret); + ++nErrs; + continue; + } + if (ret != expected) { + fprintf(stderr, "hdfsRead was supposed to read %d bytes, but " + "it read %d\n", ret, expected); + ++nErrs; + continue; + } + ret = memcmp(content, tmp, expected); + if (ret) { + fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)", + expected, tmp, expected, content); + ++nErrs; + continue; + } + } + EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file)); + fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n", + ti->threadIdx); + EXPECT_ZERO(nErrs); + return 0; +} + +static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti) +{ + fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n", + ti->threadIdx); + EXPECT_NONNULL(ti->hdfs); + EXPECT_ZERO(doTestHdfsMiniStress(ti, 1)); + EXPECT_ZERO(doTestHdfsMiniStress(ti, 0)); + return 0; +} + +static void testHdfsMiniStress(void *v) +{ + struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v; + int ret = testHdfsMiniStressImpl(ti); + ti->success = ret; +} + +static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads) +{ + int i, threadsFailed = 0; + const char *sep = ""; + + for (i = 0; i < tlhNumThreads; i++) { + if (ti[i].success != 0) { + threadsFailed = 1; + } + } + if (!threadsFailed) { + fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded. SUCCESS.\n"); + return EXIT_SUCCESS; + } + fprintf(stderr, "testLibHdfsMiniStress: some threads failed: ["); + for (i = 0; i < tlhNumThreads; i++) { + if (ti[i].success != 0) { + fprintf(stderr, "%s%d", sep, i); + sep = ", "; + } + } + fprintf(stderr, "]. FAILURE.\n"); + return EXIT_FAILURE; +} + +/** + * Test intended to stress libhdfs client with concurrent requests. Currently focused + * on concurrent reads. + */ +int main(void) +{ + int i, tlhNumThreads; + char *dirNm, *fileNm; + tSize fileSz; + const char *tlhNumThreadsStr, *tlhNumDNsStr; + hdfsFS hdfs = NULL; + struct NativeMiniDfsCluster* tlhCluster; + struct tlhThreadInfo ti[TLH_MAX_THREADS]; + struct NativeMiniDfsConf conf = { + 1, /* doFormat */ + }; + + dirNm = "/tlhMiniStressData"; + fileNm = "/tlhMiniStressData/file"; + fileSz = 2*1024*1024; + + tlhNumDNsStr = getenv("TLH_NUM_DNS"); + if (!tlhNumDNsStr) { + tlhNumDNsStr = "1"; + } + conf.numDataNodes = atoi(tlhNumDNsStr); + if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) { + fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes " + "between 1 and %d inclusive, not %d\n", + TLH_MAX_DNS, conf.numDataNodes); + return EXIT_FAILURE; + } + + tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); + if (!tlhNumThreadsStr) { + tlhNumThreadsStr = "8"; + } + tlhNumThreads = atoi(tlhNumThreadsStr); + if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) { + fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads " + "between 1 and %d inclusive, not %d\n", + TLH_MAX_THREADS, tlhNumThreads); + return EXIT_FAILURE; + } + memset(&ti[0], 0, sizeof(ti)); + for (i = 0; i < tlhNumThreads; i++) { + ti[i].threadIdx = i; + } + + tlhCluster = nmdCreate(&conf); + EXPECT_NONNULL(tlhCluster); + EXPECT_ZERO(nmdWaitClusterUp(tlhCluster)); + + EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL)); + + // Single threaded writes for now. + EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz)); + + // Multi-threaded reads. + for (i = 0; i < tlhNumThreads; i++) { + ti[i].theThread.start = testHdfsMiniStress; + ti[i].theThread.arg = &ti[i]; + ti[i].hdfs = hdfs; + ti[i].fileNm = fileNm; + EXPECT_ZERO(threadCreate(&ti[i].theThread)); + } + for (i = 0; i < tlhNumThreads; i++) { + EXPECT_ZERO(threadJoin(&ti[i].theThread)); + } + + EXPECT_ZERO(hdfsDisconnect(hdfs)); + EXPECT_ZERO(nmdShutdown(tlhCluster)); + nmdFree(tlhCluster); + return checkFailures(ti, tlhNumThreads); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h index 82109fd1e6..43187a5080 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h @@ -48,10 +48,8 @@ public: enum event_response_type { kOk = 0, -#ifndef NDEBUG // Responses to be used in testing only kTest_Error = 100 -#endif }; @@ -70,10 +68,9 @@ private: // // Testing support // -// If running a debug build, the consumer can stimulate errors +// The consumer can stimulate errors // within libhdfdspp by returning a Status from the callback. /////////////////////////////////////////////// -#ifndef NDEBUG public: static event_response test_err(const Status &status) { return event_response(status); @@ -86,7 +83,6 @@ private: response_(event_response_type::kTest_Error), error_status_(status) {} Status error_status_; // To be used with kTest_Error -#endif }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index af7393f159..6ec3a4b582 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -235,9 +235,7 @@ extern const char * FILE_DN_WRITE_EVENT; #define LIBHDFSPP_EVENT_OK (0) -#ifndef NDEBUG - #define DEBUG_SIMULATE_ERROR (-1) -#endif +#define DEBUG_SIMULATE_ERROR (-1) typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * cluster, int64_t value, int64_t cookie); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 04065b2177..a42feae0b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -331,6 +331,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, Error(stat); return nullptr; } + if (f && fileEventCallback) { + f->SetFileEventCallback(fileEventCallback.value()); + } return new hdfsFile_internal(f); } catch (const std::exception & e) { ReportException(e); @@ -959,7 +962,7 @@ event_response fs_callback_glue(libhdfspp_fs_event_callback handler, if (result == LIBHDFSPP_EVENT_OK) { return event_response::ok(); } -#ifndef NDEBUG +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (result == DEBUG_SIMULATE_ERROR) { return event_response::test_err(Status::Error("Simulated error")); } @@ -978,7 +981,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler, if (result == LIBHDFSPP_EVENT_OK) { return event_response::ok(); } -#ifndef NDEBUG +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (result == DEBUG_SIMULATE_ERROR) { return event_response::test_err(Status::Error("Simulated error")); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index 38d50f6eb7..df147d3b2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -239,7 +239,7 @@ void FileHandleImpl::AsyncPreadSome( // Wrap the DN in a block reader to handle the state and logic of the // block request protocol std::shared_ptr reader; - reader = CreateBlockReader(BlockReaderOptions(), dn); + reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_); // Lambdas cannot capture copies of member variables so we'll make explicit // copies for it @@ -248,8 +248,8 @@ void FileHandleImpl::AsyncPreadSome( auto cluster_name = cluster_name_; auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { - auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); -#ifndef NDEBUG + event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (event_resp.response() == event_response::kTest_Error) { handler(event_resp.status(), dn_id, transferred); return; @@ -262,8 +262,8 @@ void FileHandleImpl::AsyncPreadSome( auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] (Status status, std::shared_ptr dn) { (void)dn; - auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); -#ifndef NDEBUG + event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (event_resp.response() == event_response::kTest_Error) { status = event_resp.status(); } @@ -284,9 +284,10 @@ void FileHandleImpl::AsyncPreadSome( } std::shared_ptr FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn) + std::shared_ptr dn, + std::shared_ptr event_handlers) { - std::shared_ptr reader = std::make_shared(options, dn, cancel_state_); + std::shared_ptr reader = std::make_shared(options, dn, cancel_state_, event_handlers); LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR << ", ..., dnconn=" << dn.get() diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index a99550a13c..57cf4b77a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -119,7 +119,8 @@ public: protected: virtual std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn); + std::shared_ptr dn, + std::shared_ptr event_handlers); virtual std::shared_ptr CreateDataNodeConnection( ::asio::io_service *io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index 50529511d6..6098b9cde1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -24,7 +24,6 @@ #include - namespace hdfs { #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ @@ -105,7 +104,17 @@ void BlockReaderImpl::AsyncRequestBlock( m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; if (stat.ok()) { const auto &resp = s.response; - if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { + + if(this->event_handlers_) { + event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (stat.ok() && event_resp.response() == event_response::kTest_Error) { + stat = Status::Error("Test error"); + } +#endif + } + + if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) { if (resp.has_readopchecksuminfo()) { const auto &checksum_info = resp.readopchecksuminfo(); chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); @@ -162,6 +171,14 @@ struct BlockReaderImpl::ReadPacketHeader assert(v && "Failed to parse the header"); parent_->state_ = kReadChecksum; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; @@ -214,7 +231,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { return; } - auto handler = [parent, next](const asio::error_code &ec, size_t) { + auto handler = [parent, next, this](const asio::error_code &ec, size_t) { Status status; if (ec) { status = Status(ec.value(), ec.message().c_str()); @@ -222,6 +239,14 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData; } + if(parent->event_handlers_) { + event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; parent->checksum_.resize(parent->packet_len_ - sizeof(int) - @@ -248,7 +273,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); - auto handler = [next, this](const asio::error_code &ec, size_t transferred) { Status status; @@ -261,6 +285,14 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { parent_->state_ = kReadPacketHeader; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; @@ -292,13 +324,22 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation { return; } - auto h = [next, this](const Status &status) { + auto h = [next, this](const Status &stat) { + Status status = stat; if (status.ok()) { assert(reinterpret_cast(*bytes_transferred_) == parent_->chunk_padding_bytes_); parent_->chunk_padding_bytes_ = 0; parent_->state_ = kReadData; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; read_data_->Run(h); @@ -334,11 +375,20 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { m->Push( continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); - m->Run([this, next](const Status &status, + m->Run([this, next](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &) { + Status status = stat; if (status.ok()) { parent_->state_ = BlockReaderImpl::kFinished; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index f9794b1e1b..b5cbdf5a75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -93,9 +93,9 @@ class BlockReaderImpl : public BlockReader, public std::enable_shared_from_this { public: explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr dn, - CancelHandle cancel_state) + CancelHandle cancel_state, std::shared_ptr event_handlers=nullptr) : dn_(dn), state_(kOpen), options_(options), - chunk_padding_bytes_(0), cancel_state_(cancel_state) {} + chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} virtual void AsyncReadPacket( const MutableBuffers &buffers, @@ -152,6 +152,7 @@ private: long long bytes_to_read_; std::vector checksum_; CancelHandle cancel_state_; + LibhdfsEvents* event_handlers_; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index 8567932a43..be6d7bdb20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -274,9 +274,18 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr response) { } Status status; - if (h.has_exceptionclassname()) { + if(event_handlers_) { + event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (event_resp.response() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + } + + if (status.ok() && h.has_exceptionclassname()) { status = - Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); + Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); } io_service().post([req, response, status]() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 2b47ce12de..330f9b170f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -30,6 +30,8 @@ #include #include +#include + namespace hdfs { template @@ -72,8 +74,9 @@ RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), next_layer_(engine->io_service()), - connect_timer_(engine->io_service()) { - LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); + connect_timer_(engine->io_service()) +{ + LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } template @@ -87,7 +90,6 @@ RpcConnectionImpl::~RpcConnectionImpl() { LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } - template void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> &server, @@ -171,8 +173,8 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, Status status = ToStatus(ec); if(event_handlers_) { - auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); -#ifndef NDEBUG + event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (event_resp.response() == event_response::kTest_Error) { status = event_resp.status(); } @@ -349,27 +351,28 @@ void RpcConnectionImpl::FlushPendingRequests() { template -void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &asio_ec, +void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &original_ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; std::lock_guard state_lock(connection_state_lock_); + ::asio::error_code my_ec(original_ec); + LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); std::shared_ptr shared_this = shared_from_this(); - ::asio::error_code ec = asio_ec; if(event_handlers_) { - auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); -#ifndef NDEBUG + event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (event_resp.response() == event_response::kTest_Error) { - ec = std::make_error_code(std::errc::network_down); + my_ec = std::make_error_code(std::errc::network_down); } #endif } - switch (ec.value()) { + switch (my_ec.value()) { case 0: // No errors break; @@ -377,8 +380,8 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &asi // The event loop has been shut down. Ignore the error. return; default: - LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message()); - CommsError(ToStatus(ec)); + LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message()); + CommsError(ToStatus(my_ec)); return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index d0365c3b43..5de7d53d03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -230,7 +230,6 @@ class RpcConnection : public std::enable_shared_from_this { std::shared_ptr event_handlers_; std::string cluster_name_; - // Lock for mutable parts of this class that need to be thread safe std::mutex connection_state_lock_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index b30afb9cdc..032dfc8814 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -22,6 +22,9 @@ set (LIBHDFS_TESTS_DIR ../../libhdfs-tests) set (LIBHDFSPP_SRC_DIR ..) set (LIBHDFSPP_LIB_DIR ${LIBHDFSPP_SRC_DIR}/lib) set (LIBHDFSPP_BINDING_C ${LIBHDFSPP_LIB_DIR}/bindings/c) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-missing-field-initializers") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-missing-field-initializers") + include_directories( ${GENERATED_JAVAH} ${CMAKE_CURRENT_LIST_DIR} @@ -138,6 +141,10 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static) +build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c ${OS_DIR}/thread.c) +link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) +add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static) + build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc) link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfs_ext hdfspp_test_shim_static) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 01d723f281..9e3aeb74f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -93,9 +93,10 @@ public: std::shared_ptr mock_reader_ = std::make_shared(); protected: std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn) override + std::shared_ptr dn, + std::shared_ptr event_handlers) override { - (void) options; (void) dn; + (void) options; (void) dn; (void) event_handlers; assert(mock_reader_); return mock_reader_; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index b5f4d9ad7c..defe95d12b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks) }); io_service.run(); ASSERT_TRUE(complete); - ASSERT_EQ(7, callbacks.size()); + ASSERT_EQ(8, callbacks.size()); ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error