From 9e929a7a0ddc3de1eba6c32e55ddac642d5214be Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Wed, 16 Sep 2015 17:48:24 -0700 Subject: [PATCH] HDFS-9095. RPC client should fail gracefully when the connection is timed out or reset. Contributed by Haohui Mai. --- .../hadoop-hdfs-client/pom.xml | 10 - .../src/main/native/libhdfspp/CMakeLists.txt | 4 + .../native/libhdfspp/include/libhdfspp/hdfs.h | 3 +- .../libhdfspp/include/libhdfspp/options.h | 35 ++++ .../libhdfspp/lib/common/CMakeLists.txt | 2 +- .../native/libhdfspp/lib/common/logging.h | 61 ++++++ .../native/libhdfspp/lib/common/options.cc | 27 +++ .../native/libhdfspp/lib/fs/filesystem.cc | 13 +- .../main/native/libhdfspp/lib/fs/filesystem.h | 2 +- .../native/libhdfspp/lib/proto/CMakeLists.txt | 32 ++-- .../libhdfspp/lib/rpc/rpc_connection.cc | 58 +++++- .../native/libhdfspp/lib/rpc/rpc_connection.h | 37 ++-- .../native/libhdfspp/lib/rpc/rpc_engine.cc | 18 +- .../native/libhdfspp/lib/rpc/rpc_engine.h | 25 ++- .../native/libhdfspp/tests/CMakeLists.txt | 12 ++ .../libhdfspp/tests/inputstream_test.cc | 12 +- .../native/libhdfspp/tests/mock_connection.h | 1 + .../native/libhdfspp/tests/rpc_engine_test.cc | 179 ++++++++++++++++++ 18 files changed, 456 insertions(+), 75 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/options.h create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml index a4df099c70..9090575b18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml @@ -190,16 +190,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> org.apache.maven.plugins maven-antrun-plugin - - debug - compile - run - - - [PROTOC] ${env.HADOOP_PROTOC_PATH} - - - make compile diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt index bd5558d89e..8063f8671a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt @@ -50,6 +50,10 @@ include_directories( third_party/gmock-1.7.0 ) +set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../proto) +set(PROTO_HADOOP_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../hadoop-common-project/hadoop-common/src/main/proto) +set(PROTO_HADOOP_TEST_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../hadoop-common-project/hadoop-common/src/test/proto) + add_subdirectory(third_party/gmock-1.7.0) add_subdirectory(lib) add_subdirectory(tests) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h index edb1ff3e09..a3b185392c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -18,6 +18,7 @@ #ifndef LIBHDFSPP_HDFS_H_ #define LIBHDFSPP_HDFS_H_ +#include "libhdfspp/options.h" #include "libhdfspp/status.h" #include @@ -89,7 +90,7 @@ public: * FileSystem object. **/ static void - New(IoService *io_service, const std::string &server, + New(IoService *io_service, const Options &options, const std::string &server, const std::string &service, const std::function &handler); /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/options.h new file mode 100644 index 0000000000..c39d04e925 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/options.h @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_OPTIONS_H_ +#define LIBHDFSPP_OPTIONS_H_ + +namespace hdfs { + +/** + * Options to control the behavior of the libhdfspp library. + **/ +struct Options { + /** + * Time out of RPC requests in milliseconds. + * Default: 30000 + **/ + int rpc_timeout; + Options(); +}; +} +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 06b4ca3817..b03f00b44e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -1 +1 @@ -add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc) +add_library(common base64.cc options.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h new file mode 100644 index 0000000000..82bdae01f5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_COMMON_LOGGING_H_ +#define LIB_COMMON_LOGGING_H_ + +#include + +namespace hdfs { + +enum LogLevel { + kDebug, + kInfo, + kWarning, + kError, +}; + +#define LOG_DEBUG() LogMessage(kDebug) +#define LOG_INFO() LogMessage(kInfo) +#define LOG_WARN() LogMessage(kWarning) +#define LOG_ERROR() LogMessage(kError) + +class LogMessage { + public: + LogMessage(const LogLevel &l) { + static constexpr const char * kLogLevelMessage[] = {"DEBUG", "INFO", "WARN", "ERROR"}; + ::std::cerr << "[" << kLogLevelMessage[(size_t)l] << "] "; + } + + ~LogMessage() { + ::std::cerr << std::endl; + } + + LogMessage& operator<<(const std::string& msg) { + ::std::cerr << msg; + return *this; + } + LogMessage& operator<<(int x) { + ::std::cerr << x; + return *this; + } +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc new file mode 100644 index 0000000000..529fd0b395 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libhdfspp/options.h" + +namespace hdfs { + +Options::Options() + : rpc_timeout(30000) +{} + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 0090fd57c5..0b958a828e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -35,10 +35,10 @@ using ::asio::ip::tcp; FileSystem::~FileSystem() {} void FileSystem::New( - IoService *io_service, const std::string &server, + IoService *io_service, const Options &options, const std::string &server, const std::string &service, const std::function &handler) { - FileSystemImpl *impl = new FileSystemImpl(io_service); + FileSystemImpl *impl = new FileSystemImpl(io_service, options); impl->Connect(server, service, [impl, handler](const Status &stat) { if (stat.ok()) { handler(stat, impl); @@ -49,10 +49,11 @@ void FileSystem::New( }); } -FileSystemImpl::FileSystemImpl(IoService *io_service) +FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options) : io_service_(static_cast(io_service)), - engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(), - kNamenodeProtocol, kNamenodeProtocolVersion), + engine_(&io_service_->io_service(), options, + RpcEngine::GetRandomClientName(), kNamenodeProtocol, + kNamenodeProtocolVersion), namenode_(&engine_) {} void FileSystemImpl::Connect(const std::string &server, @@ -64,7 +65,7 @@ void FileSystemImpl::Connect(const std::string &server, m->Push(Resolve(&io_service_->io_service(), server, service, std::back_inserter(m->state()))) .Push(Bind([this, m](const Continuation::Next &next) { - engine_.Connect(m->state(), next); + engine_.Connect(m->state().front(), next); })); m->Run([this, handler](const Status &status, const State &) { if (status.ok()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 0c9dd7af5b..72f80b7b34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -28,7 +28,7 @@ namespace hdfs { class FileSystemImpl : public FileSystem { public: - FileSystemImpl(IoService *io_service); + FileSystemImpl(IoService *io_service, const Options &options); void Connect(const std::string &server, const std::string &service, std::function &&handler); virtual void Open(const std::string &path, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt index 4b0bac69eb..d5820fd9b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt @@ -1,21 +1,19 @@ -set(CLIENT_PROTO_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../proto) -set(COMMON_PROTO_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../../../hadoop-common-project/hadoop-common/src/main/proto) -set(PROTOBUF_IMPORT_DIRS ${CLIENT_PROTO_DIR} ${COMMON_PROTO_DIR}) +set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR}) protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS - ${CLIENT_PROTO_DIR}/datatransfer.proto - ${CLIENT_PROTO_DIR}/ClientDatanodeProtocol.proto - ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto - ${CLIENT_PROTO_DIR}/acl.proto - ${CLIENT_PROTO_DIR}/datatransfer.proto - ${CLIENT_PROTO_DIR}/encryption.proto - ${CLIENT_PROTO_DIR}/hdfs.proto - ${CLIENT_PROTO_DIR}/inotify.proto - ${CLIENT_PROTO_DIR}/xattr.proto - ${COMMON_PROTO_DIR}/IpcConnectionContext.proto - ${COMMON_PROTO_DIR}/ProtobufRpcEngine.proto - ${COMMON_PROTO_DIR}/RpcHeader.proto - ${COMMON_PROTO_DIR}/Security.proto + ${PROTO_HDFS_DIR}/datatransfer.proto + ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto + ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto + ${PROTO_HDFS_DIR}/acl.proto + ${PROTO_HDFS_DIR}/datatransfer.proto + ${PROTO_HDFS_DIR}/encryption.proto + ${PROTO_HDFS_DIR}/hdfs.proto + ${PROTO_HDFS_DIR}/inotify.proto + ${PROTO_HDFS_DIR}/xattr.proto + ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto + ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto + ${PROTO_HADOOP_DIR}/RpcHeader.proto + ${PROTO_HADOOP_DIR}/Security.proto ) add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc) @@ -59,7 +57,7 @@ function(GEN_HRPC SRCS) endfunction() gen_hrpc(HRPC_SRCS - ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto + ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto ) add_library(proto ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS}) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index 555c37f40f..8c4130f8c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -21,6 +21,7 @@ #include "ProtobufRpcEngine.pb.h" #include "IpcConnectionContext.pb.h" +#include "common/logging.h" #include "common/util.h" #include @@ -57,7 +58,6 @@ ConstructPacket(std::string *res, os.WriteRaw(reinterpret_cast(&net_len), sizeof(net_len)); uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf && "Cannot allocate memory"); std::for_each( headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { @@ -146,14 +146,12 @@ void RpcConnection::HandleRpcResponse(const std::vector &data) { RpcResponseHeaderProto h; ReadDelimitedPBMessage(&in, &h); - auto it = requests_on_fly_.find(h.callid()); - if (it == requests_on_fly_.end()) { - // TODO: out of line RPC request - assert(false && "Out of line request with unknown call id"); + auto req = RemoveFromRunningQueue(h.callid()); + if (!req) { + LOG_WARN() << "RPC response with Unknown call id " << h.callid(); + return; } - auto req = it->second; - requests_on_fly_.erase(it); Status stat; if (h.has_exceptionclassname()) { stat = @@ -162,6 +160,24 @@ void RpcConnection::HandleRpcResponse(const std::vector &data) { req->OnResponseArrived(&in, stat); } +void RpcConnection::HandleRpcTimeout(std::shared_ptr req, + const ::asio::error_code &ec) { + if (ec.value() == asio::error::operation_aborted) { + return; + } + + std::lock_guard state_lock(engine_state_lock_); + auto r = RemoveFromRunningQueue(req->call_id()); + if (!r) { + // The RPC might have been finished and removed from the queue + return; + } + + Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out)); + + r->OnResponseArrived(nullptr, stat); +} + std::shared_ptr RpcConnection::PrepareHandshakePacket() { static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c', RpcEngine::kRpcVersion, 0, 0}; @@ -223,4 +239,32 @@ void RpcConnection::AsyncRawRpc(const std::string &method_name, pending_requests_.push_back(r); FlushPendingRequests(); } + +void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { + Shutdown(); + std::vector> requests; + std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), + std::back_inserter(requests), + std::bind(&RequestOnFlyMap::value_type::second, _1)); + requests_on_fly_.clear(); + requests.insert(requests.end(), + std::make_move_iterator(pending_requests_.begin()), + std::make_move_iterator(pending_requests_.end())); + pending_requests_.clear(); + for (const auto &req : requests) { + req->OnResponseArrived(nullptr, ToStatus(ec)); + } +} + +std::shared_ptr +RpcConnection::RemoveFromRunningQueue(int call_id) { + auto it = requests_on_fly_.find(call_id); + if (it == requests_on_fly_.end()) { + return std::shared_ptr(); + } + + auto req = it->second; + requests_on_fly_.erase(it); + return req; +} } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index a8eecf4b09..439a7307e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -19,6 +19,8 @@ #define LIB_RPC_RPC_CONNECTION_H_ #include "rpc_engine.h" + +#include "common/logging.h" #include "common/util.h" #include @@ -30,7 +32,7 @@ namespace hdfs { template class RpcConnectionImpl : public RpcConnection { public: RpcConnectionImpl(RpcEngine *engine); - virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + virtual void Connect(const ::asio::ip::tcp::endpoint &server, Callback &&handler) override; virtual void Handshake(Callback &&handler) override; virtual void Shutdown() override; @@ -39,23 +41,22 @@ public: virtual void OnRecvCompleted(const ::asio::error_code &ec, size_t transferred) override; + NextLayer &next_layer() { return next_layer_; } private: + const Options options_; NextLayer next_layer_; }; template RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) - : RpcConnection(engine) - , next_layer_(engine->io_service()) -{} + : RpcConnection(engine), options_(engine->options()), + next_layer_(engine->io_service()) {} template void RpcConnectionImpl::Connect( - const std::vector<::asio::ip::tcp::endpoint> &server, Callback &&handler) { - ::asio::async_connect( - next_layer_, server.begin(), server.end(), - [handler](const ::asio::error_code &ec, - std::vector<::asio::ip::tcp::endpoint>::const_iterator) { + const ::asio::ip::tcp::endpoint &server, Callback &&handler) { + next_layer_.async_connect(server, + [handler](const ::asio::error_code &ec) { handler(ToStatus(ec)); }); } @@ -79,9 +80,10 @@ void RpcConnectionImpl::OnSendCompleted(const ::asio::error_code &ec, request_over_the_wire_.reset(); if (ec) { - // TODO: Current RPC has failed -- we should abandon the + // Current RPC has failed -- abandon the // connection and do proper clean up - assert(false && "Unimplemented"); + ClearAndDisconnect(ec); + return; } if (!pending_requests_.size()) { @@ -93,7 +95,10 @@ void RpcConnectionImpl::OnSendCompleted(const ::asio::error_code &ec, requests_on_fly_[req->call_id()] = req; request_over_the_wire_ = req; - // TODO: set the timeout for the RPC request + req->timer().expires_from_now( + std::chrono::milliseconds(options_.rpc_timeout)); + req->timer().async_wait(std::bind( + &RpcConnectionImpl::HandleRpcTimeout, this, req, _1)); asio::async_write( next_layer_, asio::buffer(req->payload()), @@ -115,7 +120,9 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ec, // The event loop has been shut down. Ignore the error. return; default: - assert(false && "Unimplemented"); + LOG_WARN() << "Network error during RPC: " << ec.message(); + ClearAndDisconnect(ec); + return; } if (resp_state_ == kReadLength) { @@ -131,7 +138,8 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ec, resp_length_ = ntohl(resp_length_); resp_data_.resize(resp_length_); asio::async_read(next_layer_, ::asio::buffer(resp_data_), - std::bind(&RpcConnectionImpl::OnRecvCompleted, this, _1, _2)); + std::bind(&RpcConnectionImpl::OnRecvCompleted, + this, _1, _2)); } else if (resp_state_ == kParseResponse) { resp_state_ = kReadLength; @@ -142,6 +150,7 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ec, } template void RpcConnectionImpl::Shutdown() { + next_layer_.cancel(); next_layer_.close(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index a9d6cee659..83721a76c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -26,18 +26,18 @@ namespace hdfs { -RpcEngine::RpcEngine(::asio::io_service *io_service, +RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, const std::string &client_name, const char *protocol_name, int protocol_version) - : io_service_(io_service), client_name_(client_name), + : io_service_(io_service), options_(options), client_name_(client_name), protocol_name_(protocol_name), protocol_version_(protocol_version), - call_id_(0) - , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this)) -{} + call_id_(0) { +} -void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers, +void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server, const std::function &handler) { - conn_->Connect(servers, [this, handler](const Status &stat) { + conn_.reset(new RpcConnectionImpl<::asio::ip::tcp::socket>(this)); + conn_->Connect(server, [this, handler](const Status &stat) { if (!stat.ok()) { handler(stat); } else { @@ -52,6 +52,10 @@ void RpcEngine::Shutdown() { io_service_->post([this]() { conn_->Shutdown(); }); } +void RpcEngine::TEST_SetRpcConnection(std::unique_ptr *conn) { + conn_.reset(conn->release()); +} + void RpcEngine::AsyncRpc( const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 9ff6361c81..ee04fd5a93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -18,6 +18,7 @@ #ifndef LIB_RPC_RPC_ENGINE_H_ #define LIB_RPC_RPC_ENGINE_H_ +#include "libhdfspp/options.h" #include "libhdfspp/status.h" #include @@ -39,7 +40,7 @@ public: typedef std::function Callback; virtual ~RpcConnection(); RpcConnection(RpcEngine *engine); - virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + virtual void Connect(const ::asio::ip::tcp::endpoint &server, Callback &&handler) = 0; virtual void Handshake(Callback &&handler) = 0; virtual void Shutdown() = 0; @@ -54,6 +55,7 @@ public: std::shared_ptr resp, Callback &&handler); protected: + class Request; RpcEngine *const engine_; virtual void OnSendCompleted(const ::asio::error_code &ec, size_t transferred) = 0; @@ -66,7 +68,11 @@ protected: SerializeRpcRequest(const std::string &method_name, const ::google::protobuf::MessageLite *req); void HandleRpcResponse(const std::vector &data); + void HandleRpcTimeout(std::shared_ptr req, + const ::asio::error_code &ec); void FlushPendingRequests(); + void ClearAndDisconnect(const ::asio::error_code &ec); + std::shared_ptr RemoveFromRunningQueue(int call_id); enum ResponseState { kReadLength, @@ -89,7 +95,8 @@ protected: ::asio::deadline_timer &timer() { return timer_; } const std::string &payload() const { return payload_; } void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, - const Status &status); + const Status &status); + private: const int call_id_; ::asio::deadline_timer timer_; @@ -102,7 +109,8 @@ protected: // Requests to be sent over the wire std::vector> pending_requests_; // Requests that are waiting for responses - std::unordered_map> requests_on_fly_; + typedef std::unordered_map> RequestOnFlyMap; + RequestOnFlyMap requests_on_fly_; // Lock for mutable parts of this class that need to be thread safe std::mutex engine_state_lock_; }; @@ -117,8 +125,9 @@ public: kCallIdPing = -4 }; - RpcEngine(::asio::io_service *io_service, const std::string &client_name, - const char *protocol_name, int protocol_version); + RpcEngine(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const char *protocol_name, + int protocol_version); void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, @@ -134,10 +143,11 @@ public: **/ Status RawRpc(const std::string &method_name, const std::string &req, std::shared_ptr resp); - void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + void Connect(const ::asio::ip::tcp::endpoint &server, const std::function &handler); void Start(); void Shutdown(); + void TEST_SetRpcConnection(std::unique_ptr *conn); int NextCallId() { return ++call_id_; } @@ -145,11 +155,12 @@ public: const std::string &protocol_name() const { return protocol_name_; } int protocol_version() const { return protocol_version_; } ::asio::io_service &io_service() { return *io_service_; } - + const Options &options() { return options_; } static std::string GetRandomClientName(); private: ::asio::io_service *io_service_; + Options options_; const std::string client_name_; const std::string protocol_name_; const int protocol_version_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt index df57d04c0f..eca878e1d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -18,6 +18,13 @@ add_library(test_common OBJECT mock_connection.cc) +set(PROTOBUF_IMPORT_DIRS ${PROTO_HADOOP_TEST_DIR}) + +protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS + ${PROTO_HADOOP_TEST_DIR}/test.proto + ${PROTO_HADOOP_TEST_DIR}/test_rpc_service.proto +) + add_executable(remote_block_reader_test remote_block_reader_test.cc $) target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(remote_block_reader remote_block_reader_test) @@ -29,3 +36,8 @@ add_test(sasl_digest_md5 sasl_digest_md5_test) add_executable(inputstream_test inputstream_test.cc) target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(inputstream inputstream_test) + +include_directories(${CMAKE_CURRENT_BINARY_DIR}) +add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $) +target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +add_test(rpc_engine rpc_engine_test) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc index 6d87823707..aa95256b7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc @@ -76,7 +76,8 @@ TEST(InputStreamTest, TestReadSingleTrunk) { 0, }; IoServiceImpl io_service; - FileSystemImpl fs(&io_service); + Options options; + FileSystemImpl fs(&io_service, options); InputStreamImpl is(&fs, &blocks); Status stat; size_t read = 0; @@ -109,7 +110,8 @@ TEST(InputStreamTest, TestReadMultipleTrunk) { 0, }; IoServiceImpl io_service; - FileSystemImpl fs(&io_service); + Options options; + FileSystemImpl fs(&io_service, options); InputStreamImpl is(&fs, &blocks); Status stat; size_t read = 0; @@ -144,7 +146,8 @@ TEST(InputStreamTest, TestReadError) { 0, }; IoServiceImpl io_service; - FileSystemImpl fs(&io_service); + Options options; + FileSystemImpl fs(&io_service, options); InputStreamImpl is(&fs, &blocks); Status stat; size_t read = 0; @@ -190,7 +193,8 @@ TEST(InputStreamTest, TestExcludeDataNode) { 0, }; IoServiceImpl io_service; - FileSystemImpl fs(&io_service); + Options options; + FileSystemImpl fs(&io_service, options); InputStreamImpl is(&fs, &blocks); Status stat; size_t read = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h index 086797f16f..8c0ef8cf3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -38,6 +38,7 @@ public: ProducerResult r = Produce(); if (r.first) { io_service_->post(std::bind(handler, r.first, 0)); + return; } asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); asio::buffer_copy(data, asio::buffer(r.second)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc new file mode 100644 index 0000000000..8bce1b9ce8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" +#include "test.pb.h" +#include "RpcHeader.pb.h" +#include "rpc/rpc_connection.h" + +#include + +#include + +using ::hadoop::common::RpcResponseHeaderProto; +using ::hadoop::common::EmptyRequestProto; +using ::hadoop::common::EmptyResponseProto; +using ::hadoop::common::EchoRequestProto; +using ::hadoop::common::EchoResponseProto; + +using ::asio::error_code; + +using ::testing::Return; + +using ::std::make_pair; +using ::std::string; + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +namespace hdfs { + +class MockRPCConnection : public MockConnectionBase { +public: + MockRPCConnection(::asio::io_service &io_service) + : MockConnectionBase(&io_service) {} + MOCK_METHOD0(Produce, ProducerResult()); + template + void async_connect(const Endpoint &, Callback &&handler) { + handler(::asio::error_code()); + } + void cancel() {} + void close() {} +}; + +static inline std::pair +RpcResponse(const RpcResponseHeaderProto &h, const std::string &data, + const ::asio::error_code &ec = error_code()) { + uint32_t payload_length = + pbio::CodedOutputStream::VarintSize32(h.ByteSize()) + + pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() + + data.size(); + + std::string res; + res.resize(sizeof(uint32_t) + payload_length); + uint8_t *buf = reinterpret_cast(const_cast(res.c_str())); + + buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray( + htonl(payload_length), buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf); + buf = h.SerializeWithCachedSizesToArray(buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf); + buf = pbio::CodedOutputStream::WriteStringToArray(data, buf); + + return std::make_pair(ec, std::move(res)); +} +} + +using namespace hdfs; + +TEST(RpcEngineTest, TestRoundTrip) { + ::asio::io_service io_service; + Options options; + RpcEngine engine(&io_service, options, "foo", "protocol", 1); + RpcConnectionImpl *conn = + new RpcConnectionImpl(&engine); + EchoResponseProto server_resp; + server_resp.set_message("foo"); + + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(conn->next_layer(), Produce()) + .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); + + std::unique_ptr conn_ptr(conn); + engine.TEST_SetRpcConnection(&conn_ptr); + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr resp(new EchoResponseProto()); + engine.AsyncRpc("test", &req, resp, [resp, &io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ("foo", resp->message()); + io_service.stop(); + }); + conn->Start(); + io_service.run(); +} + +TEST(RpcEngineTest, TestConnectionReset) { + ::asio::io_service io_service; + Options options; + RpcEngine engine(&io_service, options, "foo", "protocol", 1); + RpcConnectionImpl *conn = + new RpcConnectionImpl(&engine); + + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(conn->next_layer(), Produce()) + .WillOnce(Return(RpcResponse( + h, "", make_error_code(::asio::error::connection_reset)))); + + std::unique_ptr conn_ptr(conn); + engine.TEST_SetRpcConnection(&conn_ptr); + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr resp(new EchoResponseProto()); + + engine.AsyncRpc("test", &req, resp, [&io_service](const Status &stat) { + ASSERT_FALSE(stat.ok()); + }); + + engine.AsyncRpc("test", &req, resp, [&io_service](const Status &stat) { + io_service.stop(); + ASSERT_FALSE(stat.ok()); + }); + conn->Start(); + io_service.run(); +} + +TEST(RpcEngineTest, TestTimeout) { + ::asio::io_service io_service; + Options options; + options.rpc_timeout = 1; + RpcEngine engine(&io_service, options, "foo", "protocol", 1); + RpcConnectionImpl *conn = + new RpcConnectionImpl(&engine); + + EXPECT_CALL(conn->next_layer(), Produce()).Times(0); + + std::unique_ptr conn_ptr(conn); + engine.TEST_SetRpcConnection(&conn_ptr); + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr resp(new EchoResponseProto()); + engine.AsyncRpc("test", &req, resp, [resp, &io_service](const Status &stat) { + io_service.stop(); + ASSERT_FALSE(stat.ok()); + }); + + ::asio::deadline_timer timer(io_service); + timer.expires_from_now(std::chrono::milliseconds(options.rpc_timeout * 2)); + timer.async_wait(std::bind(&RpcConnection::Start, conn)); + io_service.run(); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +}