From 40a1f3631ddefd09f31ef78eccafdcb6257caab5 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Wed, 8 Jul 2015 13:13:03 -0700 Subject: [PATCH] HDFS-8737. Initial implementation of a Hadoop RPC v9 client. Contributed by Haohui Mai. --- .../src/main/native/libhdfspp/CMakeLists.txt | 12 +- .../libhdfspp/include/libhdfspp/status.h | 97 ++++++++ .../main/native/libhdfspp/lib/CMakeLists.txt | 2 + .../main/native/libhdfspp/lib/common/util.h | 58 +++++ .../native/libhdfspp/lib/proto/CMakeLists.txt | 21 ++ .../native/libhdfspp/lib/rpc/CMakeLists.txt | 3 + .../libhdfspp/lib/rpc/rpc_connection.cc | 225 ++++++++++++++++++ .../native/libhdfspp/lib/rpc/rpc_connection.h | 149 ++++++++++++ .../native/libhdfspp/lib/rpc/rpc_engine.cc | 98 ++++++++ .../native/libhdfspp/lib/rpc/rpc_engine.h | 160 +++++++++++++ 10 files changed, 824 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt index 2986b886e2..f4bc8b88c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt @@ -18,6 +18,9 @@ project (libhdfspp) +find_package(Protobuf REQUIRED) +find_package(Threads) + add_definitions(-DASIO_STANDALONE -DASIO_CPP11_DATE_TIME) if(UNIX) @@ -30,6 +33,13 @@ if(APPLE) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -Wno-deprecated-declarations") endif() -include_directories(third_party/gmock-1.7.0) +include_directories( + include + lib + ${PROJECT_BINARY_DIR}/lib/proto + third_party/asio-1.10.2/include + third_party/gmock-1.7.0 +) add_subdirectory(third_party/gmock-1.7.0) +add_subdirectory(lib) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h new file mode 100644 index 0000000000..9436c8b649 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_STATUS_H_ +#define LIBHDFSPP_STATUS_H_ + +#include +#include + +namespace hdfs { + +class StatusHelper; +class Status { + public: + // Create a success status. + Status() : state_(NULL) { } + ~Status() { delete[] state_; } + explicit Status(int code, const char *msg); + + // Copy the specified status. + Status(const Status& s); + void operator=(const Status& s); + + // Return a success status. + static Status OK() { return Status(); } + static Status InvalidArgument(const char *msg) + { return Status(kInvalidArgument, msg); } + static Status ResourceUnavailable(const char *msg) + { return Status(kResourceUnavailable, msg); } + static Status Unimplemented() + { return Status(kUnimplemented, ""); } + static Status Exception(const char *expception_class_name, const char *error_message) + { return Status(kException, expception_class_name, error_message); } + + // Returns true iff the status indicates success. + bool ok() const { return (state_ == NULL); } + + // Return a string representation of this status suitable for printing. + // Returns the string "OK" for success. + std::string ToString() const; + + int code() const { + return (state_ == NULL) ? kOk : static_cast(state_[4]); + } + + private: + // OK status has a NULL state_. Otherwise, state_ is a new[] array + // of the following form: + // state_[0..3] == length of message + // state_[4] == code + // state_[5..] == message + const char* state_; + friend class StatusHelper; + + enum Code { + kOk = 0, + kInvalidArgument = static_cast(std::errc::invalid_argument), + kResourceUnavailable = static_cast(std::errc::resource_unavailable_try_again), + kUnimplemented = static_cast(std::errc::function_not_supported), + kException = 256, + }; + + explicit Status(int code, const char *msg1, const char *msg2); + static const char *CopyState(const char* s); + static const char *ConstructState(int code, const char *msg1, const char *msg2); +}; + +inline Status::Status(const Status& s) { + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); +} + +inline void Status::operator=(const Status& s) { + // The following condition catches both aliasing (when this == &s), + // and the common case where both s and *this are ok. + if (state_ != s.state_) { + delete[] state_; + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); + } +} + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt new file mode 100644 index 0000000000..7458453751 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt @@ -0,0 +1,2 @@ +add_subdirectory(rpc) +add_subdirectory(proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h new file mode 100644 index 0000000000..ff9f36c889 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_UTIL_H_ +#define LIB_COMMON_UTIL_H_ + +#include "libhdfspp/status.h" + +#include + +#include +#include + +namespace hdfs { + +static inline Status ToStatus(const ::asio::error_code &ec) { + if (ec) { + return Status(ec.value(), ec.message().c_str()); + } else { + return Status::OK(); + } +} + +static inline int DelimitedPBMessageSize( + const ::google::protobuf::MessageLite *msg) { + size_t size = msg->ByteSize(); + return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; +} + +static inline void ReadDelimitedPBMessage( + ::google::protobuf::io::CodedInputStream *in, + ::google::protobuf::MessageLite *msg) { + uint32_t size = 0; + in->ReadVarint32(&size); + auto limit = in->PushLimit(size); + msg->ParseFromCodedStream(in); + in->PopLimit(limit); +} + +std::string Base64Encode(const std::string &src); + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt new file mode 100644 index 0000000000..156a7f440a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt @@ -0,0 +1,21 @@ +set(CLIENT_PROTO_DIR ${CMAKE_SOURCE_DIR}/../proto) +set(COMMON_PROTO_DIR ${CMAKE_SOURCE_DIR}/../../../../../hadoop-common-project/hadoop-common/src/main/proto) +set(PROTOBUF_IMPORT_DIRS ${CLIENT_PROTO_DIR} ${COMMON_PROTO_DIR}) + +protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS + ${CLIENT_PROTO_DIR}/datatransfer.proto + ${CLIENT_PROTO_DIR}/ClientDatanodeProtocol.proto + ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto + ${CLIENT_PROTO_DIR}/acl.proto + ${CLIENT_PROTO_DIR}/datatransfer.proto + ${CLIENT_PROTO_DIR}/encryption.proto + ${CLIENT_PROTO_DIR}/hdfs.proto + ${CLIENT_PROTO_DIR}/inotify.proto + ${CLIENT_PROTO_DIR}/xattr.proto + ${COMMON_PROTO_DIR}/IpcConnectionContext.proto + ${COMMON_PROTO_DIR}/ProtobufRpcEngine.proto + ${COMMON_PROTO_DIR}/RpcHeader.proto + ${COMMON_PROTO_DIR}/Security.proto +) + +add_library(proto ${PROTO_SRCS} ${PROTO_HDRS}) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt new file mode 100644 index 0000000000..aa3951c3c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt @@ -0,0 +1,3 @@ +include_directories(${OPENSSL_INCLUDE_DIRS}) +add_library(rpc rpc_connection.cc rpc_engine.cc) +add_dependencies(rpc proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc new file mode 100644 index 0000000000..4d08bd1579 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "rpc_engine.h" + +#include "RpcHeader.pb.h" +#include "ProtobufRpcEngine.pb.h" +#include "IpcConnectionContext.pb.h" + +#include "common/util.h" + +#include + +#include +#include + +namespace hdfs { + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +using namespace ::hadoop::common; +using namespace ::std::placeholders; + +static void +ConstructPacket(std::string *res, + std::initializer_list headers, + const std::string *request) { + int len = 0; + std::for_each( + headers.begin(), headers.end(), + [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); + if (request) { + len += pbio::CodedOutputStream::VarintSize32(request->size()) + + request->size(); + } + + int net_len = htonl(len); + res->reserve(res->size() + sizeof(net_len) + len); + + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + os.WriteRaw(reinterpret_cast(&net_len), sizeof(net_len)); + + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf && "Cannot allocate memory"); + + std::for_each( + headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); + buf = v->SerializeWithCachedSizesToArray(buf); + }); + + if (request) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf); + buf = os.WriteStringToArray(*request, buf); + } +} + +static void SetRequestHeader(RpcEngine *engine, int call_id, + const std::string &method_name, + RpcRequestHeaderProto *rpc_header, + RequestHeaderProto *req_header) { + rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); + rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + rpc_header->set_callid(call_id); + rpc_header->set_clientid(engine->client_name()); + + req_header->set_methodname(method_name); + req_header->set_declaringclassprotocolname(engine->protocol_name()); + req_header->set_clientprotocolversion(engine->protocol_version()); +} + +RpcConnection::~RpcConnection() {} + +RpcConnection::Request::Request(RpcConnection *parent, + const std::string &method_name, + const std::string &request, Handler &&handler) + : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), + handler_(std::move(handler)) { + RpcRequestHeaderProto rpc_header; + RequestHeaderProto req_header; + SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, + &req_header); + ConstructPacket(&payload_, {&rpc_header, &req_header}, &request); +} + +RpcConnection::Request::Request(RpcConnection *parent, + const std::string &method_name, + const pb::MessageLite *request, + Handler &&handler) + : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), + handler_(std::move(handler)) { + RpcRequestHeaderProto rpc_header; + RequestHeaderProto req_header; + SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, + &req_header); + ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr); +} + +void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is, + const Status &status) { + handler_(is, status); +} + +RpcConnection::RpcConnection(RpcEngine *engine) + : engine_(engine), resp_state_(kReadLength), resp_length_(0) {} + +::asio::io_service &RpcConnection::io_service() { + return engine_->io_service(); +} + +void RpcConnection::Start() { + io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this, + ::asio::error_code(), 0)); +} + +void RpcConnection::FlushPendingRequests() { + io_service().post([this]() { + if (!request_over_the_wire_) { + OnSendCompleted(::asio::error_code(), 0); + } + }); +} + +void RpcConnection::HandleRpcResponse(const std::vector &data) { + /* assumed to be called from a context that has already acquired the + * engine_state_lock */ + pbio::ArrayInputStream ar(&data[0], data.size()); + pbio::CodedInputStream in(&ar); + in.PushLimit(data.size()); + RpcResponseHeaderProto h; + ReadDelimitedPBMessage(&in, &h); + + auto it = requests_on_fly_.find(h.callid()); + if (it == requests_on_fly_.end()) { + // TODO: out of line RPC request + assert(false && "Out of line request with unknown call id"); + } + + auto req = it->second; + requests_on_fly_.erase(it); + Status stat; + if (h.has_exceptionclassname()) { + stat = + Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); + } + req->OnResponseArrived(&in, stat); +} + +std::shared_ptr RpcConnection::PrepareHandshakePacket() { + static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c', + RpcEngine::kRpcVersion, 0, 0}; + auto res = + std::make_shared(kHandshakeHeader, sizeof(kHandshakeHeader)); + + RpcRequestHeaderProto h; + h.set_rpckind(RPC_PROTOCOL_BUFFER); + h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + h.set_callid(RpcEngine::kCallIdConnectionContext); + h.set_clientid(engine_->client_name()); + + IpcConnectionContextProto handshake; + handshake.set_protocol(engine_->protocol_name()); + ConstructPacket(res.get(), {&h, &handshake}, nullptr); + return res; +} + +void RpcConnection::AsyncRpc( + const std::string &method_name, const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) { + std::lock_guard state_lock(engine_state_lock_); + + auto wrapped_handler = + [resp, handler](pbio::CodedInputStream *is, const Status &status) { + if (status.ok()) { + ReadDelimitedPBMessage(is, resp.get()); + } + handler(status); + }; + + auto r = std::make_shared(this, method_name, req, + std::move(wrapped_handler)); + pending_requests_.push_back(r); + FlushPendingRequests(); +} + +void RpcConnection::AsyncRawRpc(const std::string &method_name, + const std::string &req, + std::shared_ptr resp, + Callback &&handler) { + std::lock_guard state_lock(engine_state_lock_); + + auto wrapped_handler = + [this, resp, handler](pbio::CodedInputStream *is, const Status &status) { + if (status.ok()) { + uint32_t size = 0; + is->ReadVarint32(&size); + auto limit = is->PushLimit(size); + is->ReadString(resp.get(), limit); + is->PopLimit(limit); + } + handler(status); + }; + + auto r = std::make_shared(this, method_name, req, + std::move(wrapped_handler)); + pending_requests_.push_back(r); + FlushPendingRequests(); +} +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h new file mode 100644 index 0000000000..a8eecf4b09 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_CONNECTION_H_ +#define LIB_RPC_RPC_CONNECTION_H_ + +#include "rpc_engine.h" +#include "common/util.h" + +#include +#include +#include + +namespace hdfs { + +template class RpcConnectionImpl : public RpcConnection { +public: + RpcConnectionImpl(RpcEngine *engine); + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + Callback &&handler) override; + virtual void Handshake(Callback &&handler) override; + virtual void Shutdown() override; + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) override; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) override; + +private: + NextLayer next_layer_; +}; + +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) + : RpcConnection(engine) + , next_layer_(engine->io_service()) +{} + +template +void RpcConnectionImpl::Connect( + const std::vector<::asio::ip::tcp::endpoint> &server, Callback &&handler) { + ::asio::async_connect( + next_layer_, server.begin(), server.end(), + [handler](const ::asio::error_code &ec, + std::vector<::asio::ip::tcp::endpoint>::const_iterator) { + handler(ToStatus(ec)); + }); +} + +template +void RpcConnectionImpl::Handshake(Callback &&handler) { + auto handshake_packet = PrepareHandshakePacket(); + ::asio::async_write( + next_layer_, asio::buffer(*handshake_packet), + [handshake_packet, handler](const ::asio::error_code &ec, size_t) { + handler(ToStatus(ec)); + }); +} + +template +void RpcConnectionImpl::OnSendCompleted(const ::asio::error_code &ec, + size_t) { + using std::placeholders::_1; + using std::placeholders::_2; + std::lock_guard state_lock(engine_state_lock_); + + request_over_the_wire_.reset(); + if (ec) { + // TODO: Current RPC has failed -- we should abandon the + // connection and do proper clean up + assert(false && "Unimplemented"); + } + + if (!pending_requests_.size()) { + return; + } + + std::shared_ptr req = pending_requests_.front(); + pending_requests_.erase(pending_requests_.begin()); + requests_on_fly_[req->call_id()] = req; + request_over_the_wire_ = req; + + // TODO: set the timeout for the RPC request + + asio::async_write( + next_layer_, asio::buffer(req->payload()), + std::bind(&RpcConnectionImpl::OnSendCompleted, this, _1, _2)); +} + +template +void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ec, + size_t) { + using std::placeholders::_1; + using std::placeholders::_2; + std::lock_guard state_lock(engine_state_lock_); + + switch (ec.value()) { + case 0: + // No errors + break; + case asio::error::operation_aborted: + // The event loop has been shut down. Ignore the error. + return; + default: + assert(false && "Unimplemented"); + } + + if (resp_state_ == kReadLength) { + resp_state_ = kReadContent; + auto buf = ::asio::buffer(reinterpret_cast(&resp_length_), + sizeof(resp_length_)); + asio::async_read(next_layer_, buf, + std::bind(&RpcConnectionImpl::OnRecvCompleted, + this, _1, _2)); + + } else if (resp_state_ == kReadContent) { + resp_state_ = kParseResponse; + resp_length_ = ntohl(resp_length_); + resp_data_.resize(resp_length_); + asio::async_read(next_layer_, ::asio::buffer(resp_data_), + std::bind(&RpcConnectionImpl::OnRecvCompleted, this, _1, _2)); + + } else if (resp_state_ == kParseResponse) { + resp_state_ = kReadLength; + HandleRpcResponse(resp_data_); + resp_data_.clear(); + Start(); + } +} + +template void RpcConnectionImpl::Shutdown() { + next_layer_.close(); +} +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc new file mode 100644 index 0000000000..50dce86457 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "rpc_engine.h" +#include "rpc_connection.h" +#include "common/util.h" + +#include + +#include +#include + +namespace hdfs { + +RpcEngine::RpcEngine(::asio::io_service *io_service, + const std::string &client_name, const char *protocol_name, + int protocol_version) + : io_service_(io_service), client_name_(client_name), + protocol_name_(protocol_name), protocol_version_(protocol_version), + call_id_(0) + , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this)) +{} + +Status +RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) { + using ::asio::ip::tcp; + auto stat = std::make_shared>(); + std::future future(stat->get_future()); + conn_->Connect(servers, [this, stat](const Status &status) { + if (!status.ok()) { + stat->set_value(status); + return; + } + conn_->Handshake( + [this, stat](const Status &status) { stat->set_value(status); }); + }); + return future.get(); +} + +void RpcEngine::Start() { conn_->Start(); } + +void RpcEngine::Shutdown() { + io_service_->post([this]() { conn_->Shutdown(); }); +} + +void RpcEngine::AsyncRpc( + const std::string &method_name, const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp, + std::function &&handler) { + conn_->AsyncRpc(method_name, req, resp, std::move(handler)); +} + +Status +RpcEngine::Rpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp) { + auto stat = std::make_shared>(); + std::future future(stat->get_future()); + AsyncRpc(method_name, req, resp, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, + std::shared_ptr resp) { + auto stat = std::make_shared>(); + std::future future(stat->get_future()); + conn_->AsyncRawRpc(method_name, req, resp, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +std::string RpcEngine::GetRandomClientName() { + unsigned char buf[6] = { + 0, + }; + RAND_pseudo_bytes(buf, sizeof(buf)); + + std::stringstream ss; + ss << "libhdfs++_" + << Base64Encode(std::string(reinterpret_cast(buf), sizeof(buf))); + return ss.str(); +} +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h new file mode 100644 index 0000000000..cd5c0e6a47 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_ENGINE_H_ +#define LIB_RPC_RPC_ENGINE_H_ + +#include "libhdfspp/status.h" + +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace hdfs { + +class RpcEngine; +class RpcConnection { +public: + typedef std::function Callback; + virtual ~RpcConnection(); + RpcConnection(RpcEngine *engine); + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + Callback &&handler) = 0; + virtual void Handshake(Callback &&handler) = 0; + virtual void Shutdown() = 0; + + void Start(); + void AsyncRpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + Callback &&handler); + + void AsyncRawRpc(const std::string &method_name, const std::string &request, + std::shared_ptr resp, Callback &&handler); + +protected: + RpcEngine *const engine_; + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + + ::asio::io_service &io_service(); + std::shared_ptr PrepareHandshakePacket(); + static std::string + SerializeRpcRequest(const std::string &method_name, + const ::google::protobuf::MessageLite *req); + void HandleRpcResponse(const std::vector &data); + void FlushPendingRequests(); + + enum ResponseState { + kReadLength, + kReadContent, + kParseResponse, + } resp_state_; + unsigned resp_length_; + std::vector resp_data_; + + class Request { + public: + typedef std::function Handler; + Request(RpcConnection *parent, const std::string &method_name, + const std::string &request, Handler &&callback); + Request(RpcConnection *parent, const std::string &method_name, + const ::google::protobuf::MessageLite *request, Handler &&callback); + + int call_id() const { return call_id_; } + ::asio::deadline_timer &timer() { return timer_; } + const std::string &payload() const { return payload_; } + void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, + const Status &status); + private: + const int call_id_; + ::asio::deadline_timer timer_; + std::string payload_; + Handler handler_; + }; + + // The request being sent over the wire + std::shared_ptr request_over_the_wire_; + // Requests to be sent over the wire + std::vector> pending_requests_; + // Requests that are waiting for responses + std::unordered_map> requests_on_fly_; + // Lock for mutable parts of this class that need to be thread safe + std::mutex engine_state_lock_; +}; + +class RpcEngine { +public: + enum { kRpcVersion = 9 }; + enum { + kCallIdAuthorizationFailed = -1, + kCallIdInvalid = -2, + kCallIdConnectionContext = -3, + kCallIdPing = -4 + }; + + RpcEngine(::asio::io_service *io_service, const std::string &client_name, + const char *protocol_name, int protocol_version); + + void AsyncRpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp, + std::function &&handler); + + Status Rpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp); + /** + * Send raw bytes as RPC payload. This is intended to be used in JNI + * bindings only. + **/ + Status RawRpc(const std::string &method_name, const std::string &req, + std::shared_ptr resp); + Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server); + void Start(); + void Shutdown(); + + int NextCallId() { return ++call_id_; } + + const std::string &client_name() const { return client_name_; } + const std::string &protocol_name() const { return protocol_name_; } + int protocol_version() const { return protocol_version_; } + ::asio::io_service &io_service() { return *io_service_; } + + static std::string GetRandomClientName(); + +private: + ::asio::io_service *io_service_; + const std::string client_name_; + const std::string protocol_name_; + const int protocol_version_; + std::atomic_int call_id_; + std::unique_ptr conn_; +}; +} + +#endif