diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml index 0574b25f56..39372e62ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml @@ -147,7 +147,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> - + - + diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c index 6938109d53..b36ef76e62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c @@ -182,16 +182,6 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) } (*env)->DeleteLocalRef(env, val.l); } - if (conf->numDataNodes) { - jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, - "numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes); - if (jthr) { - printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " - "Builder::numDataNodes"); - goto error; - } - } - (*env)->DeleteLocalRef(env, val.l); jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "build", "()L" MINIDFS_CLUSTER ";"); if (jthr) { @@ -301,7 +291,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, jthrowable jthr; int ret = 0; const char *host; - + if (!env) { fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n"); return -EIO; @@ -316,7 +306,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, return -EIO; } jNameNode = jVal.l; - + // Then get the http address (InetSocketAddress) of the NameNode jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE, "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";"); @@ -327,7 +317,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, goto error_dlr_nn; } jAddress = jVal.l; - + jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, "getPort", "()I"); if (jthr) { @@ -337,7 +327,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, goto error_dlr_addr; } *port = jVal.i; - + jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;"); if (jthr) { @@ -349,12 +339,12 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, host = (*env)->GetStringUTFChars(env, jVal.l, NULL); *hostName = strdup(host); (*env)->ReleaseStringUTFChars(env, jVal.l, host); - + error_dlr_addr: (*env)->DeleteLocalRef(env, jAddress); error_dlr_nn: (*env)->DeleteLocalRef(env, jNameNode); - + return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h index 628180f087..ce8b1cfdab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h @@ -26,7 +26,7 @@ extern "C" { #endif struct hdfsBuilder; -struct NativeMiniDfsCluster; +struct NativeMiniDfsCluster; /** * Represents a configuration to use for creating a Native MiniDFSCluster @@ -51,11 +51,6 @@ struct NativeMiniDfsConf { * Nonzero if we should configure short circuit. */ jboolean configureShortCircuit; - - /** - * The number of datanodes in MiniDfsCluster - */ - jint numDataNodes; }; /** @@ -101,13 +96,13 @@ void nmdFree(struct NativeMiniDfsCluster* cl); * * @return the port, or a negative error code */ -int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); +int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); /** * Get the http address that's in use by the given (non-HA) nativeMiniDfs * * @param cl The initialized NativeMiniDfsCluster - * @param port Used to capture the http port of the NameNode + * @param port Used to capture the http port of the NameNode * of the NativeMiniDfsCluster * @param hostName Used to capture the http hostname of the NameNode * of the NativeMiniDfsCluster diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c deleted file mode 100644 index 71db8eed34..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c +++ /dev/null @@ -1,338 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "expect.h" -#include "hdfs/hdfs.h" -#include "hdfspp/hdfs_ext.h" -#include "native_mini_dfs.h" -#include "os/thread.h" - -#include -#include -#include -#include -#include -#include - -#define TO_STR_HELPER(X) #X -#define TO_STR(X) TO_STR_HELPER(X) - -#define TLH_MAX_THREADS 10000 - -#define TLH_MAX_DNS 16 - -#define TLH_DEFAULT_BLOCK_SIZE 1048576 - -#define TLH_DEFAULT_DFS_REPLICATION 3 - -#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100 - -#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5 - -#ifndef RANDOM_ERROR_RATIO -#define RANDOM_ERROR_RATIO 1000000000 -#endif - -struct tlhThreadInfo { - /** Thread index */ - int threadIdx; - /** 0 = thread was successful; error code otherwise */ - int success; - /** thread identifier */ - thread theThread; - /** fs, shared with other threads **/ - hdfsFS hdfs; - /** Filename */ - const char *fileNm; - -}; - -static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs, - const char *username) -{ - int ret; - tPort port; - hdfsFS hdfs; - struct hdfsBuilder *bld; - - port = (tPort)nmdGetNameNodePort(cl); - if (port < 0) { - fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort " - "returned error %d\n", port); - return port; - } - bld = hdfsNewBuilder(); - if (!bld) - return -ENOMEM; - hdfsBuilderSetForceNewInstance(bld); - hdfsBuilderSetNameNode(bld, "localhost"); - hdfsBuilderSetNameNodePort(bld, port); - hdfsBuilderConfSetStr(bld, "dfs.block.size", - TO_STR(TLH_DEFAULT_BLOCK_SIZE)); - hdfsBuilderConfSetStr(bld, "dfs.blocksize", - TO_STR(TLH_DEFAULT_BLOCK_SIZE)); - hdfsBuilderConfSetStr(bld, "dfs.replication", - TO_STR(TLH_DEFAULT_DFS_REPLICATION)); - hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries", - TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES)); - hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval", - TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS)); - if (username) { - hdfsBuilderSetUserName(bld, username); - } - hdfs = hdfsBuilderConnect(bld); - if (!hdfs) { - ret = -errno; - return ret; - } - *fs = hdfs; - return 0; -} - -static int hdfsWriteData(hdfsFS hdfs, const char *dirNm, - const char *fileNm, tSize fileSz) -{ - hdfsFile file; - int ret, expected; - const char *content; - - content = fileNm; - - if (hdfsExists(hdfs, dirNm) == 0) { - EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1)); - } - EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm)); - - file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0); - EXPECT_NONNULL(file); - - expected = (int)strlen(content); - tSize sz = 0; - while (sz < fileSz) { - ret = hdfsWrite(hdfs, file, content, expected); - if (ret < 0) { - ret = errno; - fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret); - return ret; - } - if (ret != expected) { - fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but " - "it wrote %d\n", ret, expected); - return EIO; - } - sz += ret; - } - EXPECT_ZERO(hdfsFlush(hdfs, file)); - EXPECT_ZERO(hdfsHSync(hdfs, file)); - EXPECT_ZERO(hdfsCloseFile(hdfs, file)); - return 0; -} - -static int fileEventCallback(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie) -{ - char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO"); - int64_t randomErrRatio = RANDOM_ERROR_RATIO; - if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr); - if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR; - else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK; - return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK; -} - -static int doTestHdfsMiniStress(struct tlhThreadInfo *ti) -{ - char tmp[4096]; - hdfsFile file; - int ret, expected; - hdfsFileInfo *fileInfo; - uint64_t readOps, nErrs=0; - tOffset seekPos; - const char *content; - - content = ti->fileNm; - expected = (int)strlen(content); - - fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm); - EXPECT_NONNULL(fileInfo); - - file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0); - EXPECT_NONNULL(file); - - libhdfspp_file_event_callback callback = &fileEventCallback; - - hdfsPreAttachFileMonitor(callback, 0); - - fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n", - ti->threadIdx); - for (readOps=0; readOps < 1000; ++readOps) { - EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file)); - file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0); - EXPECT_NONNULL(file); - seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected); - seekPos = (seekPos / expected) * expected; - ret = hdfsSeek(ti->hdfs, file, seekPos); - if (ret < 0) { - ret = errno; - fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set" - " errno %d\n", seekPos, ret); - ++nErrs; - continue; - } - ret = hdfsRead(ti->hdfs, file, tmp, expected); - if (ret < 0) { - ret = errno; - fprintf(stderr, "hdfsRead failed and set errno %d\n", ret); - ++nErrs; - continue; - } - if (ret != expected) { - fprintf(stderr, "hdfsRead was supposed to read %d bytes, but " - "it read %d\n", ret, expected); - ++nErrs; - continue; - } - ret = memcmp(content, tmp, expected); - if (ret) { - fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)", - expected, tmp, expected, content); - ++nErrs; - continue; - } - } - EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file)); - fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n", - ti->threadIdx); - EXPECT_ZERO(nErrs); - return 0; -} - -static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti) -{ - fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n", - ti->threadIdx); - EXPECT_NONNULL(ti->hdfs); - EXPECT_ZERO(doTestHdfsMiniStress(ti)); - return 0; -} - -static void testHdfsMiniStress(void *v) -{ - struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v; - int ret = testHdfsMiniStressImpl(ti); - ti->success = ret; -} - -static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads) -{ - int i, threadsFailed = 0; - const char *sep = ""; - - for (i = 0; i < tlhNumThreads; i++) { - if (ti[i].success != 0) { - threadsFailed = 1; - } - } - if (!threadsFailed) { - fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded. SUCCESS.\n"); - return EXIT_SUCCESS; - } - fprintf(stderr, "testLibHdfsMiniStress: some threads failed: ["); - for (i = 0; i < tlhNumThreads; i++) { - if (ti[i].success != 0) { - fprintf(stderr, "%s%d", sep, i); - sep = ", "; - } - } - fprintf(stderr, "]. FAILURE.\n"); - return EXIT_FAILURE; -} - -/** - * Test intended to stress libhdfs client with concurrent requests. Currently focused - * on concurrent reads. - */ -int main(void) -{ - int i, tlhNumThreads; - char *dirNm, *fileNm; - tSize fileSz; - const char *tlhNumThreadsStr, *tlhNumDNsStr; - hdfsFS hdfs = NULL; - struct NativeMiniDfsCluster* tlhCluster; - struct tlhThreadInfo ti[TLH_MAX_THREADS]; - struct NativeMiniDfsConf conf = { - 1, /* doFormat */ - }; - - dirNm = "/tlhMiniStressData"; - fileNm = "/tlhMiniStressData/file"; - fileSz = 2*1024*1024; - - tlhNumDNsStr = getenv("TLH_NUM_DNS"); - if (!tlhNumDNsStr) { - tlhNumDNsStr = "1"; - } - conf.numDataNodes = atoi(tlhNumDNsStr); - if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) { - fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes " - "between 1 and %d inclusive, not %d\n", - TLH_MAX_DNS, conf.numDataNodes); - return EXIT_FAILURE; - } - - tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); - if (!tlhNumThreadsStr) { - tlhNumThreadsStr = "8"; - } - tlhNumThreads = atoi(tlhNumThreadsStr); - if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) { - fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads " - "between 1 and %d inclusive, not %d\n", - TLH_MAX_THREADS, tlhNumThreads); - return EXIT_FAILURE; - } - memset(&ti[0], 0, sizeof(ti)); - for (i = 0; i < tlhNumThreads; i++) { - ti[i].threadIdx = i; - } - - tlhCluster = nmdCreate(&conf); - EXPECT_NONNULL(tlhCluster); - EXPECT_ZERO(nmdWaitClusterUp(tlhCluster)); - - EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL)); - - // Single threaded writes for now. - EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz)); - - // Multi-threaded reads. - for (i = 0; i < tlhNumThreads; i++) { - ti[i].theThread.start = testHdfsMiniStress; - ti[i].theThread.arg = &ti[i]; - ti[i].hdfs = hdfs; - ti[i].fileNm = fileNm; - EXPECT_ZERO(threadCreate(&ti[i].theThread)); - } - for (i = 0; i < tlhNumThreads; i++) { - EXPECT_ZERO(threadJoin(&ti[i].theThread)); - } - - EXPECT_ZERO(hdfsDisconnect(hdfs)); - EXPECT_ZERO(nmdShutdown(tlhCluster)); - nmdFree(tlhCluster); - return checkFailures(ti, tlhNumThreads); -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 3f19ae3743..0fe84795c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -298,9 +298,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, Error(stat); return nullptr; } - if (f && fileEventCallback) { - f->SetFileEventCallback(fileEventCallback.value()); - } return new hdfsFile_internal(f); } catch (const std::exception & e) { ReportException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index 9f9311f16a..471281aa5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -231,7 +231,7 @@ void FileHandleImpl::AsyncPreadSome( // Wrap the DN in a block reader to handle the state and logic of the // block request protocol std::shared_ptr reader; - reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_); + reader = CreateBlockReader(BlockReaderOptions(), dn); // Lambdas cannot capture copies of member variables so we'll make explicit // copies for it @@ -240,7 +240,7 @@ void FileHandleImpl::AsyncPreadSome( auto cluster_name = cluster_name_; auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { - event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); + auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { handler(event_resp.status(), dn_id, transferred); @@ -254,7 +254,7 @@ void FileHandleImpl::AsyncPreadSome( auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] (Status status, std::shared_ptr dn) { (void)dn; - event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); + auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { status = event_resp.status(); @@ -276,10 +276,9 @@ void FileHandleImpl::AsyncPreadSome( } std::shared_ptr FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn, - std::shared_ptr event_handlers) + std::shared_ptr dn) { - std::shared_ptr reader = std::make_shared(options, dn, cancel_state_, event_handlers); + std::shared_ptr reader = std::make_shared(options, dn, cancel_state_); LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR << ", ..., dnconn=" << dn.get() diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 57cf4b77a9..a99550a13c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -119,8 +119,7 @@ public: protected: virtual std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn, - std::shared_ptr event_handlers); + std::shared_ptr dn); virtual std::shared_ptr CreateDataNodeConnection( ::asio::io_service *io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 0d4be41eca..b05bc3d541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -96,7 +96,7 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n io_service = nullptr; /* spawn background threads for asio delegation */ - unsigned int threads = 2 /* options.io_threads_, pending HDFS-9117 */; + unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */; for (unsigned int i = 0; i < threads; i++) { AddWorkerThread(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index defcc1a143..50529511d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -24,6 +24,7 @@ #include + namespace hdfs { #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ @@ -104,17 +105,7 @@ void BlockReaderImpl::AsyncRequestBlock( m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; if (stat.ok()) { const auto &resp = s.response; - - if(this->event_handlers_) { - event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef NDEBUG - if (stat.ok() && event_resp.response() == event_response::kTest_Error) { - stat = Status::Error("Test error"); - } -#endif - } - - if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) { + if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { if (resp.has_readopchecksuminfo()) { const auto &checksum_info = resp.readopchecksuminfo(); chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); @@ -171,14 +162,6 @@ struct BlockReaderImpl::ReadPacketHeader assert(v && "Failed to parse the header"); parent_->state_ = kReadChecksum; } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef NDEBUG - if (status.ok() && event_resp.response() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } next(status); }; @@ -231,7 +214,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { return; } - auto handler = [parent, next, this](const asio::error_code &ec, size_t) { + auto handler = [parent, next](const asio::error_code &ec, size_t) { Status status; if (ec) { status = Status(ec.value(), ec.message().c_str()); @@ -239,14 +222,6 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData; } - if(parent->event_handlers_) { - event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef NDEBUG - if (status.ok() && event_resp.response() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } next(status); }; parent->checksum_.resize(parent->packet_len_ - sizeof(int) - @@ -273,6 +248,7 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); + auto handler = [next, this](const asio::error_code &ec, size_t transferred) { Status status; @@ -285,14 +261,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { parent_->state_ = kReadPacketHeader; } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef NDEBUG - if (status.ok() && event_resp.response() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } next(status); }; @@ -324,22 +292,13 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation { return; } - auto h = [next, this](const Status &stat) { - Status status = stat; + auto h = [next, this](const Status &status) { if (status.ok()) { assert(reinterpret_cast(*bytes_transferred_) == parent_->chunk_padding_bytes_); parent_->chunk_padding_bytes_ = 0; parent_->state_ = kReadData; } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef NDEBUG - if (status.ok() && event_resp.response() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } next(status); }; read_data_->Run(h); @@ -375,20 +334,11 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { m->Push( continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); - m->Run([this, next](const Status &stat, + m->Run([this, next](const Status &status, const hadoop::hdfs::ClientReadStatusProto &) { - Status status = stat; if (status.ok()) { parent_->state_ = BlockReaderImpl::kFinished; } - if(parent_->event_handlers_) { - event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); -#ifndef NDEBUG - if (status.ok() && event_resp.response() == event_response::kTest_Error) { - status = Status::Error("Test error"); - } -#endif - } next(status); }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index b5cbdf5a75..f9794b1e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -93,9 +93,9 @@ class BlockReaderImpl : public BlockReader, public std::enable_shared_from_this { public: explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr dn, - CancelHandle cancel_state, std::shared_ptr event_handlers=nullptr) + CancelHandle cancel_state) : dn_(dn), state_(kOpen), options_(options), - chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} + chunk_padding_bytes_(0), cancel_state_(cancel_state) {} virtual void AsyncReadPacket( const MutableBuffers &buffers, @@ -152,7 +152,6 @@ private: long long bytes_to_read_; std::vector checksum_; CancelHandle cancel_state_; - LibhdfsEvents* event_handlers_; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index a72d194a41..749195a278 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -274,18 +274,9 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr response) { } Status status; - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); -#ifndef NDEBUG - if (event_resp.response() == event_response::kTest_Error) { - status = event_resp.status(); - } -#endif - } - - if (status.ok() && h.has_exceptionclassname()) { + if (h.has_exceptionclassname()) { status = - Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); + Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); } io_service().post([req, response, status]() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 70a96b0399..255b98be80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -30,8 +30,6 @@ #include #include -#include - namespace hdfs { template @@ -65,10 +63,6 @@ public: NextLayer next_layer_; void ConnectComplete(const ::asio::error_code &ec); - - // Hide default ctors. - RpcConnectionImpl(); - RpcConnectionImpl(const RpcConnectionImpl &other); }; template @@ -76,7 +70,7 @@ RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), next_layer_(engine->io_service()) { - LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); + LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); } template @@ -90,6 +84,7 @@ RpcConnectionImpl::~RpcConnectionImpl() { LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } + template void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> &server, @@ -150,7 +145,7 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) Status status = ToStatus(ec); if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); + auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { status = event_resp.status(); @@ -315,28 +310,27 @@ void RpcConnectionImpl::FlushPendingRequests() { template -void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &original_ec, +void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &asio_ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; std::lock_guard state_lock(connection_state_lock_); - ::asio::error_code my_ec(original_ec); - LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); std::shared_ptr shared_this = shared_from_this(); + ::asio::error_code ec = asio_ec; if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); + auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); #ifndef NDEBUG if (event_resp.response() == event_response::kTest_Error) { - my_ec = std::make_error_code(std::errc::network_down); + ec = std::make_error_code(std::errc::network_down); } #endif } - switch (my_ec.value()) { + switch (ec.value()) { case 0: // No errors break; @@ -344,8 +338,8 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ori // The event loop has been shut down. Ignore the error. return; default: - LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message()); - CommsError(ToStatus(my_ec)); + LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message()); + CommsError(ToStatus(ec)); return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 5f7e618e32..066c01f56e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -229,6 +229,7 @@ class RpcConnection : public std::enable_shared_from_this { std::shared_ptr event_handlers_; std::string cluster_name_; + // Lock for mutable parts of this class that need to be thread safe std::mutex connection_state_lock_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 45bbeb25de..b30afb9cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -138,10 +138,6 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static) -build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c ${OS_DIR}/thread.c) -link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) -add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static) - build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc) link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfs_ext hdfspp_test_shim_static) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 9e3aeb74f3..01d723f281 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -93,10 +93,9 @@ public: std::shared_ptr mock_reader_ = std::make_shared(); protected: std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn, - std::shared_ptr event_handlers) override + std::shared_ptr dn) override { - (void) options; (void) dn; (void) event_handlers; + (void) options; (void) dn; assert(mock_reader_); return mock_reader_; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index 7613bf31df..0737d0816f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -24,9 +24,15 @@ #include #include +/* Cheat for now and use the same hdfsBuilder as libhdfs */ +/* (libhdfspp doesn't have an hdfsBuilder yet). */ struct hdfsBuilder { - struct libhdfs_hdfsBuilder * libhdfsBuilder; - struct libhdfspp_hdfsBuilder * libhdfsppBuilder; + int forceNewInstance; + const char *nn; + tPort port; + const char *kerbTicketCachePath; + const char *userName; + struct hdfsBuilderConfOpt *opts; }; /* Shim structs and functions that delegate to libhdfspp and libhdfs. */ @@ -92,13 +98,13 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { hdfsFS ret = calloc(1, sizeof(struct hdfs_internal)); - ret->libhdfsppRep = libhdfspp_hdfsBuilderConnect(bld->libhdfsppBuilder); + ret->libhdfsppRep = libhdfspp_hdfsConnect(bld->nn, bld->port); if (!ret->libhdfsppRep) { free(ret); ret = NULL; } else { /* Destroys bld object. */ - ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld->libhdfsBuilder); + ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld); if (!ret->libhdfsRep) { libhdfspp_hdfsDisconnect(ret->libhdfsppRep); free(ret); @@ -109,61 +115,49 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { } struct hdfsBuilder *hdfsNewBuilder(void) { - struct hdfsBuilder * ret = calloc(1, sizeof(struct hdfsBuilder)); - ret->libhdfsppBuilder = libhdfspp_hdfsNewBuilder(); - ret->libhdfsBuilder = libhdfs_hdfsNewBuilder(); - return ret; + return libhdfs_hdfsNewBuilder(); } void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) { - libhdfs_hdfsBuilderSetForceNewInstance(bld->libhdfsBuilder); -// libhdfspp_hdfsBuilderSetForceNewInstance(bld->libhdfsppBuilder); + libhdfs_hdfsBuilderSetForceNewInstance(bld); } void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) { - libhdfs_hdfsBuilderSetNameNode(bld->libhdfsBuilder, nn); - libhdfspp_hdfsBuilderSetNameNode(bld->libhdfsppBuilder, nn); + libhdfs_hdfsBuilderSetNameNode(bld, nn); } void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) { - libhdfs_hdfsBuilderSetNameNodePort(bld->libhdfsBuilder, port); - libhdfspp_hdfsBuilderSetNameNodePort(bld->libhdfsppBuilder, port); + libhdfs_hdfsBuilderSetNameNodePort(bld, port); } void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) { - libhdfs_hdfsBuilderSetUserName(bld->libhdfsBuilder, userName); - libhdfspp_hdfsBuilderSetUserName(bld->libhdfsppBuilder, userName); + libhdfs_hdfsBuilderSetUserName(bld, userName); } void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, const char *kerbTicketCachePath) { - libhdfs_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsBuilder, kerbTicketCachePath); -// libhdfspp_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsppBuilder, kerbTicketCachePath); + libhdfs_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); } void hdfsFreeBuilder(struct hdfsBuilder *bld) { - libhdfs_hdfsFreeBuilder(bld->libhdfsBuilder); - libhdfspp_hdfsFreeBuilder(bld->libhdfsppBuilder); - free(bld); + libhdfs_hdfsFreeBuilder(bld); } int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, const char *val) { - fprintf(stderr, "hdfs_shim::hdfsBuilderConfSetStr) key=%s val=%s\n", key, val); - libhdfs_hdfsBuilderConfSetStr(bld->libhdfsBuilder, key, val); - return libhdfspp_hdfsBuilderConfSetStr(bld->libhdfsppBuilder, key, val); + return libhdfs_hdfsBuilderConfSetStr(bld, key, val); } int hdfsConfGetStr(const char *key, char **val) { - return libhdfspp_hdfsConfGetStr(key, val); + return libhdfs_hdfsConfGetStr(key, val); } int hdfsConfGetInt(const char *key, int32_t *val) { - return libhdfspp_hdfsConfGetInt(key, val); + return libhdfs_hdfsConfGetInt(key, val); } void hdfsConfStrFree(char *val) { - libhdfspp_hdfsConfStrFree(val); + libhdfs_hdfsConfStrFree(val); } int hdfsDisconnect(hdfsFS fs) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h index 7aa33e6dca..0d50fdaf1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h @@ -81,7 +81,6 @@ #define hadoopRzBufferLength libhdfspp_hadoopRzBufferLength #define hadoopRzBufferGet libhdfspp_hadoopRzBufferGet #define hadoopRzBufferFree libhdfspp_hadoopRzBufferFree -#define hdfsBuilder libhdfspp_hdfsBuilder #define hdfs_internal libhdfspp_hdfs_internal #define hdfsFS libhdfspp_hdfsFS #define hdfsFile_internal libhdfspp_hdfsFile_internal diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index defe95d12b..b5f4d9ad7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks) }); io_service.run(); ASSERT_TRUE(complete); - ASSERT_EQ(8, callbacks.size()); + ASSERT_EQ(7, callbacks.size()); ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error