From ed77d8d5df23be776e87032c9667b9025e889504 Mon Sep 17 00:00:00 2001 From: Bob Hansen Date: Mon, 4 Apr 2016 08:53:52 -0700 Subject: [PATCH] HDFS-10231: libhdfs++: Fix race conditions in RPC layer. Contributed by Bob Hansen. --- .../libhdfspp/lib/rpc/rpc_connection.cc | 86 ++++++++++++------- .../native/libhdfspp/lib/rpc/rpc_connection.h | 52 +++++++++-- .../native/libhdfspp/lib/rpc/rpc_engine.cc | 81 ++++++++--------- .../native/libhdfspp/lib/rpc/rpc_engine.h | 23 +++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 12 +-- 5 files changed, 158 insertions(+), 96 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index bed33474f5..6c3b82ed73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -158,15 +158,17 @@ void Request::OnResponseArrived(pbio::CodedInputStream *is, RpcConnection::RpcConnection(LockFreeRpcEngine *engine) : engine_(engine), - connected_(false) {} + connected_(kNotYetConnected) {} ::asio::io_service &RpcConnection::io_service() { return engine_->io_service(); } void RpcConnection::StartReading() { - io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this, - ::asio::error_code(), 0)); + auto shared_this = shared_from_this(); + io_service().post([shared_this, this] () { + OnRecvCompleted(::asio::error_code(), 0); + }); } void RpcConnection::AsyncFlushPendingRequests() { @@ -174,6 +176,8 @@ void RpcConnection::AsyncFlushPendingRequests() { io_service().post([shared_this, this]() { std::lock_guard state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc called (connected=" << ToString(connected_) << ")"); + if (!request_over_the_wire_) { FlushPendingRequests(); } @@ -281,40 +285,53 @@ void RpcConnection::AsyncRpc( auto r = std::make_shared(engine_, method_name, req, std::move(wrapped_handler)); - pending_requests_.push_back(r); - FlushPendingRequests(); -} -void RpcConnection::AsyncRawRpc(const std::string &method_name, - const std::string &req, - std::shared_ptr resp, - RpcCallback &&handler) { - std::lock_guard state_lock(connection_state_lock_); + if (connected_ == kDisconnected) { + // Oops. The connection failed _just_ before the engine got a chance + // to send it. Register it as a failure + Status status = Status::ResourceUnavailable("RpcConnection closed before send."); + auto r_vector = std::vector > (1, r); + assert(r_vector[0].get() != nullptr); - std::shared_ptr shared_this = shared_from_this(); - auto wrapped_handler = [shared_this, this, resp, handler]( - pbio::CodedInputStream *is, const Status &status) { - if (status.ok()) { - uint32_t size = 0; - is->ReadVarint32(&size); - auto limit = is->PushLimit(size); - is->ReadString(resp.get(), limit); - is->PopLimit(limit); + engine_->AsyncRpcCommsError(status, shared_from_this(), r_vector); + } else { + pending_requests_.push_back(r); + + if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking + FlushPendingRequests(); } - handler(status); - }; - - auto r = std::make_shared(engine_, method_name, req, - std::move(wrapped_handler)); - pending_requests_.push_back(r); - FlushPendingRequests(); + } } +void RpcConnection::AsyncRpc(const std::vector > & requests) { + std::lock_guard state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc[] called; connected=" << ToString(connected_)); + + if (connected_ == kDisconnected) { + // Oops. The connection failed _just_ before the engine got a chance + // to send it. Register it as a failure + Status status = Status::ResourceUnavailable("RpcConnection closed before send."); + engine_->AsyncRpcCommsError(status, shared_from_this(), requests); + } else { + pending_requests_.reserve(pending_requests_.size() + requests.size()); + for (auto r: requests) { + pending_requests_.push_back(r); + } + if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking + FlushPendingRequests(); + } + } +} + + void RpcConnection::PreEnqueueRequests( std::vector> requests) { // Public method - acquire lock std::lock_guard state_lock(connection_state_lock_); - assert(!connected_); + + LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called"); + + assert(connected_ == kNotYetConnected); pending_requests_.insert(pending_requests_.end(), requests.begin(), requests.end()); @@ -349,7 +366,7 @@ void RpcConnection::CommsError(const Status &status) { std::make_move_iterator(pending_requests_.end())); pending_requests_.clear(); - engine_->AsyncRpcCommsError(status, requestsToReturn); + engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn); } void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { @@ -379,4 +396,15 @@ std::shared_ptr RpcConnection::RemoveFromRunningQueue(int call_id) { requests_on_fly_.erase(it); return req; } + +std::string RpcConnection::ToString(ConnectedState connected) { + switch(connected) { + case kNotYetConnected: return "NotYetConnected"; + case kConnecting: return "Connecting"; + case kConnected: return "Connected"; + case kDisconnected: return "Disconnected"; + default: return "Invalid ConnectedState"; + } +} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index cab14fa290..4c33a4120a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,6 +34,8 @@ template class RpcConnectionImpl : public RpcConnection { public: RpcConnectionImpl(RpcEngine *engine); + virtual ~RpcConnectionImpl() override; + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler); virtual void ConnectAndFlush( @@ -49,7 +51,7 @@ public: NextLayer &next_layer() { return next_layer_; } - void TEST_set_connected(bool new_value) { connected_ = new_value; } + void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } private: const Options options_; @@ -66,7 +68,19 @@ RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) options_(engine->options()), next_layer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); - } +} + +template +RpcConnectionImpl::~RpcConnectionImpl() { + LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); + + std::lock_guard state_lock(connection_state_lock_); + if (pending_requests_.size() > 0) + LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); + if (requests_on_fly_.size() > 0) + LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); +} + template void RpcConnectionImpl::Connect( @@ -93,6 +107,16 @@ void RpcConnectionImpl::ConnectAndFlush( return; } + if (connected_ == kConnected) { + FlushPendingRequests(); + return; + } + if (connected_ != kNotYetConnected) { + LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_)); + return; + } + connected_ = kConnecting; + // Take the first endpoint, but remember the alternatives for later additional_endpoints_ = server; ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); @@ -169,8 +193,8 @@ void RpcConnectionImpl::Handshake(RpcCallback &handler) { [handshake_packet, handler, shared_this, this]( const ::asio::error_code &ec, size_t) { Status status = ToStatus(ec); - if (status.ok()) { - connected_ = true; + if (status.ok() && connected_ == kConnecting) { + connected_ = kConnected; } handler(status); }); @@ -208,9 +232,13 @@ void RpcConnectionImpl::FlushPendingRequests() { return; } - if (!connected_) { + if (connected_ == kDisconnected) { + LOG_WARN(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a disconnected connection"); return; } + if (connected_ != kConnected) { + LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection"); + } // Don't send if we don't need to if (request_over_the_wire_) { @@ -218,19 +246,25 @@ void RpcConnectionImpl::FlushPendingRequests() { } std::shared_ptr req = pending_requests_.front(); + auto weak_req = std::weak_ptr(req); pending_requests_.erase(pending_requests_.begin()); std::shared_ptr shared_this = shared_from_this(); + auto weak_this = std::weak_ptr(shared_this); std::shared_ptr payload = std::make_shared(); req->GetPacket(payload.get()); if (!payload->empty()) { + assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end()); requests_on_fly_[req->call_id()] = req; request_over_the_wire_ = req; req->timer().expires_from_now( std::chrono::milliseconds(options_.rpc_timeout)); - req->timer().async_wait([shared_this, this, req](const ::asio::error_code &ec) { - this->HandleRpcTimeout(req, ec); + req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) { + auto timeout_this = weak_this.lock(); + auto timeout_req = weak_req.lock(); + if (timeout_this && timeout_req) + this->HandleRpcTimeout(timeout_req, ec); }); asio::async_write(next_layer_, asio::buffer(*payload), @@ -320,11 +354,11 @@ void RpcConnectionImpl::Disconnect() { LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); request_over_the_wire_.reset(); - if (connected_) { + if (connected_ == kConnecting || connected_ == kConnected) { next_layer_.cancel(); next_layer_.close(); } - connected_ = false; + connected_ = kDisconnected; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 70b50cfa88..132bb69a73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -61,7 +61,6 @@ void RpcEngine::Shutdown() { LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); io_service_->post([this]() { std::lock_guard state_lock(engine_state_lock_); - conn_->Disconnect(); conn_.reset(); }); } @@ -122,47 +121,33 @@ std::shared_ptr RpcEngine::InitializeConnection() return result; } - -Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, - std::shared_ptr resp) { - LOG_TRACE(kRPC, << "RpcEngine::RawRpc called"); - - std::shared_ptr conn; - { - std::lock_guard state_lock(engine_state_lock_); - if (!conn_) { - conn_ = InitializeConnection(); - conn_->ConnectAndFlush(last_endpoints_); - } - conn = conn_; - } - - auto stat = std::make_shared>(); - std::future future(stat->get_future()); - conn->AsyncRawRpc(method_name, req, resp, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} - void RpcEngine::AsyncRpcCommsError( const Status &status, + std::shared_ptr failedConnection, std::vector> pendingRequests) { - LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called"); + LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); - io_service().post([this, status, pendingRequests]() { - RpcCommsError(status, pendingRequests); + io_service().post([this, status, failedConnection, pendingRequests]() { + RpcCommsError(status, failedConnection, pendingRequests); }); } void RpcEngine::RpcCommsError( const Status &status, + std::shared_ptr failedConnection, std::vector> pendingRequests) { (void)status; - LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called"); + LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); std::lock_guard state_lock(engine_state_lock_); + // If the failed connection is the current one, shut it down + // It will be reconnected when there is work to do + if (failedConnection == conn_) { + conn_.reset(); + } + auto head_action = optional(); // Filter out anything with too many retries already @@ -192,25 +177,35 @@ void RpcEngine::RpcCommsError( } } - // Close the connection and retry and requests that might have been sent to - // the NN - if (!pendingRequests.empty() && - head_action && head_action->action != RetryAction::FAIL) { - conn_ = InitializeConnection(); + // If we have reqests that need to be re-sent, ensure that we have a connection + // and send the requests to it + bool haveRequests = !pendingRequests.empty() && + head_action && head_action->action != RetryAction::FAIL; - conn_->PreEnqueueRequests(pendingRequests); - if (head_action->delayMillis > 0) { - retry_timer.expires_from_now( - std::chrono::milliseconds(options_.rpc_retry_delay_ms)); - retry_timer.async_wait([this](asio::error_code ec) { - if (!ec) conn_->ConnectAndFlush(last_endpoints_); - }); + if (haveRequests) { + bool needNewConnection = !conn_; + if (needNewConnection) { + conn_ = InitializeConnection(); + conn_->PreEnqueueRequests(pendingRequests); + + if (head_action->delayMillis > 0) { + auto weak_conn = std::weak_ptr(conn_); + retry_timer.expires_from_now( + std::chrono::milliseconds(options_.rpc_retry_delay_ms)); + retry_timer.async_wait([this, weak_conn](asio::error_code ec) { + auto strong_conn = weak_conn.lock(); + if ( (!ec) && (strong_conn) ) { + strong_conn->ConnectAndFlush(last_endpoints_); + } + }); + } else { + conn_->ConnectAndFlush(last_endpoints_); + } } else { - conn_->ConnectAndFlush(last_endpoints_); + // We have an existing connection (which might be closed; we don't know + // until we hold the connection local) and should just add the new requests + conn_->AsyncRpc(pendingRequests); } - } else { - // Connection will try again if someone calls AsyncRpc - conn_.reset(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 7b66ac03c8..8ea6e8d529 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -125,8 +125,7 @@ class RpcConnection : public std::enable_shared_from_this { std::shared_ptr<::google::protobuf::MessageLite> resp, const RpcCallback &handler); - void AsyncRawRpc(const std::string &method_name, const std::string &request, - std::shared_ptr resp, RpcCallback &&handler); + void AsyncRpc(const std::vector > & requests); // Enqueue requests before the connection is connected. Will be flushed // on connect @@ -182,7 +181,15 @@ class RpcConnection : public std::enable_shared_from_this { // Connection can have deferred connection, especially when we're pausing // during retry - bool connected_; + enum ConnectedState { + kNotYetConnected, + kConnecting, + kConnected, + kDisconnected + }; + static std::string ToString(ConnectedState connected); + + ConnectedState connected_; // The request being sent over the wire; will also be in requests_on_fly_ std::shared_ptr request_over_the_wire_; // Requests to be sent over the wire @@ -207,6 +214,7 @@ class LockFreeRpcEngine { public: /* Enqueues a CommsError without acquiring a lock*/ virtual void AsyncRpcCommsError(const Status &status, + std::shared_ptr failedConnection, std::vector> pendingRequests) = 0; @@ -254,19 +262,16 @@ class RpcEngine : public LockFreeRpcEngine { Status Rpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp); - /** - * Send raw bytes as RPC payload. This is intended to be used in JNI - * bindings only. - **/ - Status RawRpc(const std::string &method_name, const std::string &req, - std::shared_ptr resp); + void Start(); void Shutdown(); /* Enqueues a CommsError without acquiring a lock*/ void AsyncRpcCommsError(const Status &status, + std::shared_ptr failedConnection, std::vector> pendingRequests) override; void RpcCommsError(const Status &status, + std::shared_ptr failedConnection, std::vector> pendingRequests); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index b7d5d0bb21..b5f4d9ad7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -106,8 +106,8 @@ TEST(RpcEngineTest, TestRoundTrip) { ::asio::io_service io_service; Options options; RpcEngine engine(&io_service, options, "foo", "", "protocol", 1); - RpcConnectionImpl *conn = - new RpcConnectionImpl(&engine); + auto conn = + std::make_shared >(&engine); conn->TEST_set_connected(true); conn->StartReading(); @@ -142,8 +142,8 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) { ::asio::io_service io_service; Options options; RpcEngine engine(&io_service, options, "foo", "", "protocol", 1); - RpcConnectionImpl *conn = - new RpcConnectionImpl(&engine); + auto conn = + std::make_shared >(&engine); conn->TEST_set_connected(true); conn->StartReading(); @@ -436,8 +436,8 @@ TEST(RpcEngineTest, TestTimeout) { Options options; options.rpc_timeout = 1; RpcEngine engine(&io_service, options, "foo", "", "protocol", 1); - RpcConnectionImpl *conn = - new RpcConnectionImpl(&engine); + auto conn = + std::make_shared >(&engine); conn->TEST_set_connected(true); conn->StartReading();