diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml index 39372e62ed..0574b25f56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml @@ -147,7 +147,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> - + - + diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c index b36ef76e62..6938109d53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c @@ -182,6 +182,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) } (*env)->DeleteLocalRef(env, val.l); } + if (conf->numDataNodes) { + jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, + "numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " + "Builder::numDataNodes"); + goto error; + } + } + (*env)->DeleteLocalRef(env, val.l); jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "build", "()L" MINIDFS_CLUSTER ";"); if (jthr) { @@ -291,7 +301,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, jthrowable jthr; int ret = 0; const char *host; - + if (!env) { fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n"); return -EIO; @@ -306,7 +316,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, return -EIO; } jNameNode = jVal.l; - + // Then get the http address (InetSocketAddress) of the NameNode jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE, "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";"); @@ -317,7 +327,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, goto error_dlr_nn; } jAddress = jVal.l; - + jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, "getPort", "()I"); if (jthr) { @@ -327,7 +337,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, goto error_dlr_addr; } *port = jVal.i; - + jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;"); if (jthr) { @@ -339,12 +349,12 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, host = (*env)->GetStringUTFChars(env, jVal.l, NULL); *hostName = strdup(host); (*env)->ReleaseStringUTFChars(env, jVal.l, host); - + error_dlr_addr: (*env)->DeleteLocalRef(env, jAddress); error_dlr_nn: (*env)->DeleteLocalRef(env, jNameNode); - + return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h index ce8b1cfdab..628180f087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h @@ -26,7 +26,7 @@ extern "C" { #endif struct hdfsBuilder; -struct NativeMiniDfsCluster; +struct NativeMiniDfsCluster; /** * Represents a configuration to use for creating a Native MiniDFSCluster @@ -51,6 +51,11 @@ struct NativeMiniDfsConf { * Nonzero if we should configure short circuit. */ jboolean configureShortCircuit; + + /** + * The number of datanodes in MiniDfsCluster + */ + jint numDataNodes; }; /** @@ -96,13 +101,13 @@ void nmdFree(struct NativeMiniDfsCluster* cl); * * @return the port, or a negative error code */ -int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); +int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); /** * Get the http address that's in use by the given (non-HA) nativeMiniDfs * * @param cl The initialized NativeMiniDfsCluster - * @param port Used to capture the http port of the NameNode + * @param port Used to capture the http port of the NameNode * of the NativeMiniDfsCluster * @param hostName Used to capture the http hostname of the NameNode * of the NativeMiniDfsCluster diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c new file mode 100644 index 0000000000..71db8eed34 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs/hdfs.h" +#include "hdfspp/hdfs_ext.h" +#include "native_mini_dfs.h" +#include "os/thread.h" + +#include +#include +#include +#include +#include +#include + +#define TO_STR_HELPER(X) #X +#define TO_STR(X) TO_STR_HELPER(X) + +#define TLH_MAX_THREADS 10000 + +#define TLH_MAX_DNS 16 + +#define TLH_DEFAULT_BLOCK_SIZE 1048576 + +#define TLH_DEFAULT_DFS_REPLICATION 3 + +#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100 + +#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5 + +#ifndef RANDOM_ERROR_RATIO +#define RANDOM_ERROR_RATIO 1000000000 +#endif + +struct tlhThreadInfo { + /** Thread index */ + int threadIdx; + /** 0 = thread was successful; error code otherwise */ + int success; + /** thread identifier */ + thread theThread; + /** fs, shared with other threads **/ + hdfsFS hdfs; + /** Filename */ + const char *fileNm; + +}; + +static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs, + const char *username) +{ + int ret; + tPort port; + hdfsFS hdfs; + struct hdfsBuilder *bld; + + port = (tPort)nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort " + "returned error %d\n", port); + return port; + } + bld = hdfsNewBuilder(); + if (!bld) + return -ENOMEM; + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderSetNameNode(bld, "localhost"); + hdfsBuilderSetNameNodePort(bld, port); + hdfsBuilderConfSetStr(bld, "dfs.block.size", + TO_STR(TLH_DEFAULT_BLOCK_SIZE)); + hdfsBuilderConfSetStr(bld, "dfs.blocksize", + TO_STR(TLH_DEFAULT_BLOCK_SIZE)); + hdfsBuilderConfSetStr(bld, "dfs.replication", + TO_STR(TLH_DEFAULT_DFS_REPLICATION)); + hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries", + TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES)); + hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval", + TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS)); + if (username) { + hdfsBuilderSetUserName(bld, username); + } + hdfs = hdfsBuilderConnect(bld); + if (!hdfs) { + ret = -errno; + return ret; + } + *fs = hdfs; + return 0; +} + +static int hdfsWriteData(hdfsFS hdfs, const char *dirNm, + const char *fileNm, tSize fileSz) +{ + hdfsFile file; + int ret, expected; + const char *content; + + content = fileNm; + + if (hdfsExists(hdfs, dirNm) == 0) { + EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1)); + } + EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm)); + + file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0); + EXPECT_NONNULL(file); + + expected = (int)strlen(content); + tSize sz = 0; + while (sz < fileSz) { + ret = hdfsWrite(hdfs, file, content, expected); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret); + return ret; + } + if (ret != expected) { + fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but " + "it wrote %d\n", ret, expected); + return EIO; + } + sz += ret; + } + EXPECT_ZERO(hdfsFlush(hdfs, file)); + EXPECT_ZERO(hdfsHSync(hdfs, file)); + EXPECT_ZERO(hdfsCloseFile(hdfs, file)); + return 0; +} + +static int fileEventCallback(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie) +{ + char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO"); + int64_t randomErrRatio = RANDOM_ERROR_RATIO; + if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr); + if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR; + else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK; + return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK; +} + +static int doTestHdfsMiniStress(struct tlhThreadInfo *ti) +{ + char tmp[4096]; + hdfsFile file; + int ret, expected; + hdfsFileInfo *fileInfo; + uint64_t readOps, nErrs=0; + tOffset seekPos; + const char *content; + + content = ti->fileNm; + expected = (int)strlen(content); + + fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm); + EXPECT_NONNULL(fileInfo); + + file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + + libhdfspp_file_event_callback callback = &fileEventCallback; + + hdfsPreAttachFileMonitor(callback, 0); + + fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n", + ti->threadIdx); + for (readOps=0; readOps < 1000; ++readOps) { + EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file)); + file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected); + seekPos = (seekPos / expected) * expected; + ret = hdfsSeek(ti->hdfs, file, seekPos); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set" + " errno %d\n", seekPos, ret); + ++nErrs; + continue; + } + ret = hdfsRead(ti->hdfs, file, tmp, expected); + if (ret < 0) { + ret = errno; + fprintf(stderr, "hdfsRead failed and set errno %d\n", ret); + ++nErrs; + continue; + } + if (ret != expected) { + fprintf(stderr, "hdfsRead was supposed to read %d bytes, but " + "it read %d\n", ret, expected); + ++nErrs; + continue; + } + ret = memcmp(content, tmp, expected); + if (ret) { + fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)", + expected, tmp, expected, content); + ++nErrs; + continue; + } + } + EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file)); + fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n", + ti->threadIdx); + EXPECT_ZERO(nErrs); + return 0; +} + +static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti) +{ + fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n", + ti->threadIdx); + EXPECT_NONNULL(ti->hdfs); + EXPECT_ZERO(doTestHdfsMiniStress(ti)); + return 0; +} + +static void testHdfsMiniStress(void *v) +{ + struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v; + int ret = testHdfsMiniStressImpl(ti); + ti->success = ret; +} + +static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads) +{ + int i, threadsFailed = 0; + const char *sep = ""; + + for (i = 0; i < tlhNumThreads; i++) { + if (ti[i].success != 0) { + threadsFailed = 1; + } + } + if (!threadsFailed) { + fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded. SUCCESS.\n"); + return EXIT_SUCCESS; + } + fprintf(stderr, "testLibHdfsMiniStress: some threads failed: ["); + for (i = 0; i < tlhNumThreads; i++) { + if (ti[i].success != 0) { + fprintf(stderr, "%s%d", sep, i); + sep = ", "; + } + } + fprintf(stderr, "]. FAILURE.\n"); + return EXIT_FAILURE; +} + +/** + * Test intended to stress libhdfs client with concurrent requests. Currently focused + * on concurrent reads. + */ +int main(void) +{ + int i, tlhNumThreads; + char *dirNm, *fileNm; + tSize fileSz; + const char *tlhNumThreadsStr, *tlhNumDNsStr; + hdfsFS hdfs = NULL; + struct NativeMiniDfsCluster* tlhCluster; + struct tlhThreadInfo ti[TLH_MAX_THREADS]; + struct NativeMiniDfsConf conf = { + 1, /* doFormat */ + }; + + dirNm = "/tlhMiniStressData"; + fileNm = "/tlhMiniStressData/file"; + fileSz = 2*1024*1024; + + tlhNumDNsStr = getenv("TLH_NUM_DNS"); + if (!tlhNumDNsStr) { + tlhNumDNsStr = "1"; + } + conf.numDataNodes = atoi(tlhNumDNsStr); + if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) { + fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes " + "between 1 and %d inclusive, not %d\n", + TLH_MAX_DNS, conf.numDataNodes); + return EXIT_FAILURE; + } + + tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); + if (!tlhNumThreadsStr) { + tlhNumThreadsStr = "8"; + } + tlhNumThreads = atoi(tlhNumThreadsStr); + if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) { + fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads " + "between 1 and %d inclusive, not %d\n", + TLH_MAX_THREADS, tlhNumThreads); + return EXIT_FAILURE; + } + memset(&ti[0], 0, sizeof(ti)); + for (i = 0; i < tlhNumThreads; i++) { + ti[i].threadIdx = i; + } + + tlhCluster = nmdCreate(&conf); + EXPECT_NONNULL(tlhCluster); + EXPECT_ZERO(nmdWaitClusterUp(tlhCluster)); + + EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL)); + + // Single threaded writes for now. + EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz)); + + // Multi-threaded reads. + for (i = 0; i < tlhNumThreads; i++) { + ti[i].theThread.start = testHdfsMiniStress; + ti[i].theThread.arg = &ti[i]; + ti[i].hdfs = hdfs; + ti[i].fileNm = fileNm; + EXPECT_ZERO(threadCreate(&ti[i].theThread)); + } + for (i = 0; i < tlhNumThreads; i++) { + EXPECT_ZERO(threadJoin(&ti[i].theThread)); + } + + EXPECT_ZERO(hdfsDisconnect(hdfs)); + EXPECT_ZERO(nmdShutdown(tlhCluster)); + nmdFree(tlhCluster); + return checkFailures(ti, tlhNumThreads); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 0fe84795c8..3f19ae3743 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -298,6 +298,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, Error(stat); return nullptr; } + if (f && fileEventCallback) { + f->SetFileEventCallback(fileEventCallback.value()); + } return new hdfsFile_internal(f); } catch (const std::exception & e) { ReportException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index 471281aa5c..9f9311f16a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -231,7 +231,7 @@ void FileHandleImpl::AsyncPreadSome( // Wrap the DN in a block reader to handle the state and logic of the // block request protocol std::shared_ptr reader; - reader = CreateBlockReader(BlockReaderOptions(), dn); + reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_); // Lambdas cannot capture copies of member variables so we'll make explicit // copies for it @@ -240,7 +240,7 @@ void FileHandleImpl::AsyncPreadSome( auto cluster_name = cluster_name_; auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { - auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); + event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { handler(event_resp.status(), dn_id, transferred); @@ -254,7 +254,7 @@ void FileHandleImpl::AsyncPreadSome( auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] (Status status, std::shared_ptr dn) { (void)dn; - auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); + event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { status = event_resp.status(); @@ -276,9 +276,10 @@ void FileHandleImpl::AsyncPreadSome( } std::shared_ptr FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn) + std::shared_ptr dn, + std::shared_ptr event_handlers) { - std::shared_ptr reader = std::make_shared(options, dn, cancel_state_); + std::shared_ptr reader = std::make_shared(options, dn, cancel_state_, event_handlers); LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR << ", ..., dnconn=" << dn.get() diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index a99550a13c..57cf4b77a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -119,7 +119,8 @@ public: protected: virtual std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn); + std::shared_ptr dn, + std::shared_ptr event_handlers); virtual std::shared_ptr CreateDataNodeConnection( ::asio::io_service *io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index b05bc3d541..0d4be41eca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -96,7 +96,7 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n io_service = nullptr; /* spawn background threads for asio delegation */ - unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */; + unsigned int threads = 2 /* options.io_threads_, pending HDFS-9117 */; for (unsigned int i = 0; i < threads; i++) { AddWorkerThread(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index 50529511d6..defcc1a143 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -24,7 +24,6 @@ #include - namespace hdfs { #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ @@ -105,7 +104,17 @@ void BlockReaderImpl::AsyncRequestBlock( m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; if (stat.ok()) { const auto &resp = s.response; - if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { + + if(this->event_handlers_) { + event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef NDEBUG + if (stat.ok() && event_resp.response() == event_response::kTest_Error) { + stat = Status::Error("Test error"); + } +#endif + } + + if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) { if (resp.has_readopchecksuminfo()) { const auto &checksum_info = resp.readopchecksuminfo(); chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); @@ -162,6 +171,14 @@ struct BlockReaderImpl::ReadPacketHeader assert(v && "Failed to parse the header"); parent_->state_ = kReadChecksum; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef NDEBUG + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; @@ -214,7 +231,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { return; } - auto handler = [parent, next](const asio::error_code &ec, size_t) { + auto handler = [parent, next, this](const asio::error_code &ec, size_t) { Status status; if (ec) { status = Status(ec.value(), ec.message().c_str()); @@ -222,6 +239,14 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData; } + if(parent->event_handlers_) { + event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef NDEBUG + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; parent->checksum_.resize(parent->packet_len_ - sizeof(int) - @@ -248,7 +273,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); - auto handler = [next, this](const asio::error_code &ec, size_t transferred) { Status status; @@ -261,6 +285,14 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { parent_->state_ = kReadPacketHeader; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef NDEBUG + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; @@ -292,13 +324,22 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation { return; } - auto h = [next, this](const Status &status) { + auto h = [next, this](const Status &stat) { + Status status = stat; if (status.ok()) { assert(reinterpret_cast(*bytes_transferred_) == parent_->chunk_padding_bytes_); parent_->chunk_padding_bytes_ = 0; parent_->state_ = kReadData; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef NDEBUG + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }; read_data_->Run(h); @@ -334,11 +375,20 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { m->Push( continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); - m->Run([this, next](const Status &status, + m->Run([this, next](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &) { + Status status = stat; if (status.ok()) { parent_->state_ = BlockReaderImpl::kFinished; } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef NDEBUG + if (status.ok() && event_resp.response() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } next(status); }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index f9794b1e1b..b5cbdf5a75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -93,9 +93,9 @@ class BlockReaderImpl : public BlockReader, public std::enable_shared_from_this { public: explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr dn, - CancelHandle cancel_state) + CancelHandle cancel_state, std::shared_ptr event_handlers=nullptr) : dn_(dn), state_(kOpen), options_(options), - chunk_padding_bytes_(0), cancel_state_(cancel_state) {} + chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} virtual void AsyncReadPacket( const MutableBuffers &buffers, @@ -152,6 +152,7 @@ private: long long bytes_to_read_; std::vector checksum_; CancelHandle cancel_state_; + LibhdfsEvents* event_handlers_; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index 749195a278..a72d194a41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -274,9 +274,18 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr response) { } Status status; - if (h.has_exceptionclassname()) { + if(event_handlers_) { + event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); +#ifndef NDEBUG + if (event_resp.response() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + } + + if (status.ok() && h.has_exceptionclassname()) { status = - Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); + Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); } io_service().post([req, response, status]() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 255b98be80..70a96b0399 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -30,6 +30,8 @@ #include #include +#include + namespace hdfs { template @@ -63,6 +65,10 @@ public: NextLayer next_layer_; void ConnectComplete(const ::asio::error_code &ec); + + // Hide default ctors. + RpcConnectionImpl(); + RpcConnectionImpl(const RpcConnectionImpl &other); }; template @@ -70,7 +76,7 @@ RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), next_layer_(engine->io_service()) { - LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); + LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } template @@ -84,7 +90,6 @@ RpcConnectionImpl::~RpcConnectionImpl() { LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } - template void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> &server, @@ -145,7 +150,7 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) Status status = ToStatus(ec); if(event_handlers_) { - auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); + event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { status = event_resp.status(); @@ -310,27 +315,28 @@ void RpcConnectionImpl::FlushPendingRequests() { template -void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &asio_ec, +void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &original_ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; std::lock_guard state_lock(connection_state_lock_); + ::asio::error_code my_ec(original_ec); + LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); std::shared_ptr shared_this = shared_from_this(); - ::asio::error_code ec = asio_ec; if(event_handlers_) { - auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); + event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { - ec = std::make_error_code(std::errc::network_down); + my_ec = std::make_error_code(std::errc::network_down); } #endif } - switch (ec.value()) { + switch (my_ec.value()) { case 0: // No errors break; @@ -338,8 +344,8 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &asi // The event loop has been shut down. Ignore the error. return; default: - LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message()); - CommsError(ToStatus(ec)); + LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message()); + CommsError(ToStatus(my_ec)); return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 066c01f56e..5f7e618e32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -229,7 +229,6 @@ class RpcConnection : public std::enable_shared_from_this { std::shared_ptr event_handlers_; std::string cluster_name_; - // Lock for mutable parts of this class that need to be thread safe std::mutex connection_state_lock_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index b30afb9cdc..45bbeb25de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -138,6 +138,10 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static) +build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c ${OS_DIR}/thread.c) +link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) +add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static) + build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc) link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfs_ext hdfspp_test_shim_static) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 01d723f281..9e3aeb74f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -93,9 +93,10 @@ public: std::shared_ptr mock_reader_ = std::make_shared(); protected: std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn) override + std::shared_ptr dn, + std::shared_ptr event_handlers) override { - (void) options; (void) dn; + (void) options; (void) dn; (void) event_handlers; assert(mock_reader_); return mock_reader_; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index 0737d0816f..7613bf31df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -24,15 +24,9 @@ #include #include -/* Cheat for now and use the same hdfsBuilder as libhdfs */ -/* (libhdfspp doesn't have an hdfsBuilder yet). */ struct hdfsBuilder { - int forceNewInstance; - const char *nn; - tPort port; - const char *kerbTicketCachePath; - const char *userName; - struct hdfsBuilderConfOpt *opts; + struct libhdfs_hdfsBuilder * libhdfsBuilder; + struct libhdfspp_hdfsBuilder * libhdfsppBuilder; }; /* Shim structs and functions that delegate to libhdfspp and libhdfs. */ @@ -98,13 +92,13 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { hdfsFS ret = calloc(1, sizeof(struct hdfs_internal)); - ret->libhdfsppRep = libhdfspp_hdfsConnect(bld->nn, bld->port); + ret->libhdfsppRep = libhdfspp_hdfsBuilderConnect(bld->libhdfsppBuilder); if (!ret->libhdfsppRep) { free(ret); ret = NULL; } else { /* Destroys bld object. */ - ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld); + ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld->libhdfsBuilder); if (!ret->libhdfsRep) { libhdfspp_hdfsDisconnect(ret->libhdfsppRep); free(ret); @@ -115,49 +109,61 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { } struct hdfsBuilder *hdfsNewBuilder(void) { - return libhdfs_hdfsNewBuilder(); + struct hdfsBuilder * ret = calloc(1, sizeof(struct hdfsBuilder)); + ret->libhdfsppBuilder = libhdfspp_hdfsNewBuilder(); + ret->libhdfsBuilder = libhdfs_hdfsNewBuilder(); + return ret; } void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) { - libhdfs_hdfsBuilderSetForceNewInstance(bld); + libhdfs_hdfsBuilderSetForceNewInstance(bld->libhdfsBuilder); +// libhdfspp_hdfsBuilderSetForceNewInstance(bld->libhdfsppBuilder); } void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) { - libhdfs_hdfsBuilderSetNameNode(bld, nn); + libhdfs_hdfsBuilderSetNameNode(bld->libhdfsBuilder, nn); + libhdfspp_hdfsBuilderSetNameNode(bld->libhdfsppBuilder, nn); } void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) { - libhdfs_hdfsBuilderSetNameNodePort(bld, port); + libhdfs_hdfsBuilderSetNameNodePort(bld->libhdfsBuilder, port); + libhdfspp_hdfsBuilderSetNameNodePort(bld->libhdfsppBuilder, port); } void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) { - libhdfs_hdfsBuilderSetUserName(bld, userName); + libhdfs_hdfsBuilderSetUserName(bld->libhdfsBuilder, userName); + libhdfspp_hdfsBuilderSetUserName(bld->libhdfsppBuilder, userName); } void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, const char *kerbTicketCachePath) { - libhdfs_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); + libhdfs_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsBuilder, kerbTicketCachePath); +// libhdfspp_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsppBuilder, kerbTicketCachePath); } void hdfsFreeBuilder(struct hdfsBuilder *bld) { - libhdfs_hdfsFreeBuilder(bld); + libhdfs_hdfsFreeBuilder(bld->libhdfsBuilder); + libhdfspp_hdfsFreeBuilder(bld->libhdfsppBuilder); + free(bld); } int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, const char *val) { - return libhdfs_hdfsBuilderConfSetStr(bld, key, val); + fprintf(stderr, "hdfs_shim::hdfsBuilderConfSetStr) key=%s val=%s\n", key, val); + libhdfs_hdfsBuilderConfSetStr(bld->libhdfsBuilder, key, val); + return libhdfspp_hdfsBuilderConfSetStr(bld->libhdfsppBuilder, key, val); } int hdfsConfGetStr(const char *key, char **val) { - return libhdfs_hdfsConfGetStr(key, val); + return libhdfspp_hdfsConfGetStr(key, val); } int hdfsConfGetInt(const char *key, int32_t *val) { - return libhdfs_hdfsConfGetInt(key, val); + return libhdfspp_hdfsConfGetInt(key, val); } void hdfsConfStrFree(char *val) { - libhdfs_hdfsConfStrFree(val); + libhdfspp_hdfsConfStrFree(val); } int hdfsDisconnect(hdfsFS fs) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h index 0d50fdaf1c..7aa33e6dca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h @@ -81,6 +81,7 @@ #define hadoopRzBufferLength libhdfspp_hadoopRzBufferLength #define hadoopRzBufferGet libhdfspp_hadoopRzBufferGet #define hadoopRzBufferFree libhdfspp_hadoopRzBufferFree +#define hdfsBuilder libhdfspp_hdfsBuilder #define hdfs_internal libhdfspp_hdfs_internal #define hdfsFS libhdfspp_hdfsFS #define hdfsFile_internal libhdfspp_hdfsFile_internal diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index b5f4d9ad7c..defe95d12b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks) }); io_service.run(); ASSERT_TRUE(complete); - ASSERT_EQ(7, callbacks.size()); + ASSERT_EQ(8, callbacks.size()); ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error