From 6dd47cae8655f92ab2ae2729c0c08738db598405 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 14 Oct 2016 10:13:24 -0400 Subject: [PATCH] libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer --- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++++++++++--------- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be400b7..a6a07c4776 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer &next_layer() { return next_layer_; } + Socket &TEST_get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> &server, const AuthInfo & auth_info, RpcCallback &handler) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> &server) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, }); } else { LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; - std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_)); + std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_)); if(!err.empty()) { LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); } @@ -202,7 +203,7 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, additional_endpoints_.erase(additional_endpoints_.begin()); current_endpoint_ = next_endpoint; - next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { + socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { ConnectComplete(ec, next_endpoint); }); connect_timer_.expires_from_now( @@ -219,8 +220,8 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, } } -template -void RpcConnectionImpl::SendHandshake(RpcCallback &handler) { +template +void RpcConnectionImpl::SendHandshake(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); @@ -228,7 +229,7 @@ void RpcConnectionImpl::SendHandshake(RpcCallback &handler) { auto shared_this = shared_from_this(); auto handshake_packet = PrepareHandshakePacket(); - ::asio::async_write(next_layer_, asio::buffer(*handshake_packet), + ::asio::async_write(socket_, asio::buffer(*handshake_packet), [handshake_packet, handler, shared_this, this]( const ::asio::error_code &ec, size_t) { Status status = ToStatus(ec); @@ -236,15 +237,15 @@ void RpcConnectionImpl::SendHandshake(RpcCallback &handler) { }); } -template -void RpcConnectionImpl::SendContext(RpcCallback &handler) { +template +void RpcConnectionImpl::SendContext(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called"); auto shared_this = shared_from_this(); auto context_packet = PrepareContextPacket(); - ::asio::async_write(next_layer_, asio::buffer(*context_packet), + ::asio::async_write(socket_, asio::buffer(*context_packet), [context_packet, handler, shared_this, this]( const ::asio::error_code &ec, size_t) { Status status = ToStatus(ec); @@ -252,8 +253,8 @@ void RpcConnectionImpl::SendContext(RpcCallback &handler) { }); } -template -void RpcConnectionImpl::OnSendCompleted(const ::asio::error_code &ec, +template +void RpcConnectionImpl::OnSendCompleted(const ::asio::error_code &ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; @@ -271,8 +272,8 @@ void RpcConnectionImpl::OnSendCompleted(const ::asio::error_code &ec, FlushPendingRequests(); } -template -void RpcConnectionImpl::FlushPendingRequests() { +template +void RpcConnectionImpl::FlushPendingRequests() { using namespace ::std::placeholders; // Lock should be held @@ -335,7 +336,7 @@ void RpcConnectionImpl::FlushPendingRequests() { this->HandleRpcTimeout(timeout_req, ec); }); - asio::async_write(next_layer_, asio::buffer(*payload), + asio::async_write(socket_, asio::buffer(*payload), [shared_this, this, payload](const ::asio::error_code &ec, size_t size) { OnSendCompleted(ec, size); @@ -352,8 +353,8 @@ void RpcConnectionImpl::FlushPendingRequests() { } -template -void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &original_ec, +template +void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &original_ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; @@ -396,7 +397,7 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ori auto buf = ::asio::buffer(reinterpret_cast(¤t_response_state_->length_), sizeof(current_response_state_->length_)); asio::async_read( - next_layer_, buf, + socket_, buf, [shared_this, this](const ::asio::error_code &ec, size_t size) { OnRecvCompleted(ec, size); }); @@ -405,7 +406,7 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ori current_response_state_->length_ = ntohl(current_response_state_->length_); current_response_state_->data_.resize(current_response_state_->length_); asio::async_read( - next_layer_, ::asio::buffer(current_response_state_->data_), + socket_, ::asio::buffer(current_response_state_->data_), [shared_this, this](const ::asio::error_code &ec, size_t size) { OnRecvCompleted(ec, size); }); @@ -425,8 +426,8 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ori } } -template -void RpcConnectionImpl::Disconnect() { +template +void RpcConnectionImpl::Disconnect() { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); @@ -434,7 +435,7 @@ void RpcConnectionImpl::Disconnect() { request_over_the_wire_.reset(); if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { // Don't print out errors, we were expecting a disconnect here - SafeDisconnect(get_asio_socket_ptr(&next_layer_)); + SafeDisconnect(get_asio_socket_ptr(&socket_)); } connected_ = kDisconnected; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index 3e8c93fa8c..08218f67c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -118,7 +118,7 @@ TEST(RpcEngineTest, TestRoundTrip) { RpcResponseHeaderProto h; h.set_callid(1); h.set_status(RpcResponseHeaderProto::SUCCESS); - EXPECT_CALL(conn->next_layer(), Produce()) + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); std::shared_ptr conn_ptr(conn); @@ -153,7 +153,7 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) { RpcResponseHeaderProto h; h.set_callid(1); h.set_status(RpcResponseHeaderProto::SUCCESS); - EXPECT_CALL(conn->next_layer(), Produce()) + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) .WillOnce(Return(RpcResponse( h, "", make_error_code(::asio::error::connection_reset)))); @@ -455,7 +455,7 @@ TEST(RpcEngineTest, TestTimeout) { conn->TEST_set_connected(true); conn->StartReading(); - EXPECT_CALL(conn->next_layer(), Produce()) + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); std::shared_ptr conn_ptr(conn);