From e18db92398adb456b8c15d2290b355417e39738c Mon Sep 17 00:00:00 2001 From: James Date: Wed, 16 Dec 2015 12:27:06 -0500 Subject: [PATCH] HDFS-9523. libhdfs++: failure to connect to ipv6 host causes CI unit tests to fail. Contributed by Bob Hansen. --- .../native/libhdfspp/lib/fs/filesystem.cc | 2 +- .../native/libhdfspp/lib/rpc/rpc_connection.h | 87 ++++++++++++++----- .../native/libhdfspp/lib/rpc/rpc_engine.cc | 14 +-- .../native/libhdfspp/lib/rpc/rpc_engine.h | 14 +-- .../native/libhdfspp/tests/rpc_engine_test.cc | 17 +++- 5 files changed, 93 insertions(+), 41 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 1a1163b394..fafaa1b1b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -49,7 +49,7 @@ void NameNodeOperations::Connect(const std::string &server, m->Push(Resolve(io_service_, server, service, std::back_inserter(m->state()))) .Push(Bind([this, m](const Continuation::Next &next) { - engine_.Connect(m->state().front(), next); + engine_.Connect(m->state(), next); })); m->Run([this, handler](const Status &status, const State &) { handler(status); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 76bbf2efab..26946bc65f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -33,10 +33,10 @@ template class RpcConnectionImpl : public RpcConnection { public: RpcConnectionImpl(RpcEngine *engine); - virtual void Connect(const ::asio::ip::tcp::endpoint &server, + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler); virtual void ConnectAndFlush( - const ::asio::ip::tcp::endpoint &server) override; + const std::vector<::asio::ip::tcp::endpoint> &server) override; virtual void Handshake(RpcCallback &handler) override; virtual void Disconnect() override; virtual void OnSendCompleted(const ::asio::error_code &ec, @@ -52,7 +52,11 @@ public: private: const Options options_; + std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; NextLayer next_layer_; + + void ConnectComplete(const ::asio::error_code &ec); + void HandshakeComplete(const Status &s); }; template @@ -63,7 +67,7 @@ RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) template void RpcConnectionImpl::Connect( - const ::asio::ip::tcp::endpoint &server, RpcCallback &handler) { + const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) { auto connectionSuccessfulReq = std::make_shared( engine_, [handler](::google::protobuf::io::CodedInputStream *is, const Status &status) { @@ -76,28 +80,65 @@ void RpcConnectionImpl::Connect( template void RpcConnectionImpl::ConnectAndFlush( - const ::asio::ip::tcp::endpoint &server) { - std::shared_ptr shared_this = shared_from_this(); - next_layer_.async_connect(server, - [shared_this, this](const ::asio::error_code &ec) { - std::lock_guard state_lock(connection_state_lock_); - Status status = ToStatus(ec); - if (status.ok()) { - StartReading(); - Handshake([shared_this, this](const Status &s) { - std::lock_guard state_lock(connection_state_lock_); - if (s.ok()) { - FlushPendingRequests(); - } else { - CommsError(s); - }; - }); - } else { - CommsError(status); - } - }); + const std::vector<::asio::ip::tcp::endpoint> &server) { + std::lock_guard state_lock(connection_state_lock_); + + if (server.empty()) { + Status s = Status::InvalidArgument("No endpoints provided"); + CommsError(s); + return; + } + + // Take the first endpoint, but remember the alternatives for later + additional_endpoints_ = server; + ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); + additional_endpoints_.erase(additional_endpoints_.begin()); + + auto shared_this = shared_from_this(); + next_layer_.async_connect(first_endpoint, [shared_this, this](const ::asio::error_code &ec) { + ConnectComplete(ec); + }); } +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) { + auto shared_this = RpcConnectionImpl::shared_from_this(); + std::lock_guard state_lock(connection_state_lock_); + + Status status = ToStatus(ec); + if (status.ok()) { + StartReading(); + Handshake([shared_this, this](const Status & s) { + HandshakeComplete(s); + }); + } else { + next_layer_.close(); + if (!additional_endpoints_.empty()) { + // If we have additional endpoints, keep trying until we either run out or + // hit one + ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front(); + additional_endpoints_.erase(additional_endpoints_.begin()); + + next_layer_.async_connect(next_endpoint, [shared_this, this](const ::asio::error_code &ec) { + ConnectComplete(ec); + }); + } else { + CommsError(status); + } + } +} + +template +void RpcConnectionImpl::HandshakeComplete(const Status &s) { + std::lock_guard state_lock(connection_state_lock_); + if (s.ok()) { + FlushPendingRequests(); + } else { + CommsError(s); + }; +} + + template void RpcConnectionImpl::Handshake(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index c779b1cc32..a84257b30b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -39,13 +39,13 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, call_id_(0), retry_timer(*io_service) {} -void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server, +void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) { std::lock_guard state_lock(engine_state_lock_); - last_endpoint_ = server; + last_endpoints_ = server; conn_ = NewConnection(); - conn_->Connect(server, handler); + conn_->Connect(last_endpoints_, handler); } void RpcEngine::Shutdown() { @@ -75,7 +75,7 @@ void RpcEngine::AsyncRpc( std::lock_guard state_lock(engine_state_lock_); if (!conn_) { conn_ = NewConnection(); - conn_->ConnectAndFlush(last_endpoint_); + conn_->ConnectAndFlush(last_endpoints_); } conn_->AsyncRpc(method_name, req, resp, handler); } @@ -103,7 +103,7 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, std::lock_guard state_lock(engine_state_lock_); if (!conn_) { conn_ = NewConnection(); - conn_->ConnectAndFlush(last_endpoint_); + conn_->ConnectAndFlush(last_endpoints_); } conn = conn_; } @@ -170,10 +170,10 @@ void RpcEngine::RpcCommsError( retry_timer.expires_from_now( std::chrono::milliseconds(options_.rpc_retry_delay_ms)); retry_timer.async_wait([this](asio::error_code ec) { - if (!ec) conn_->ConnectAndFlush(last_endpoint_); + if (!ec) conn_->ConnectAndFlush(last_endpoints_); }); } else { - conn_->ConnectAndFlush(last_endpoint_); + conn_->ConnectAndFlush(last_endpoints_); } } else { // Connection will try again if someone calls AsyncRpc diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 770e16349c..e6beef6f2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -110,9 +110,11 @@ class RpcConnection : public std::enable_shared_from_this { RpcConnection(LockFreeRpcEngine *engine); virtual ~RpcConnection(); - virtual void Connect(const ::asio::ip::tcp::endpoint &server, + // Note that a single server can have multiple endpoints - especially both + // an ipv4 and ipv6 endpoint + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) = 0; - virtual void ConnectAndFlush(const ::asio::ip::tcp::endpoint &server) = 0; + virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; virtual void Handshake(RpcCallback &handler) = 0; virtual void Disconnect() = 0; @@ -231,7 +233,7 @@ class RpcEngine : public LockFreeRpcEngine { const std::string &client_name, const char *protocol_name, int protocol_version); - void Connect(const ::asio::ip::tcp::endpoint &server, RpcCallback &handler); + void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler); void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, @@ -272,6 +274,9 @@ class RpcEngine : public LockFreeRpcEngine { std::shared_ptr conn_; virtual std::shared_ptr NewConnection(); virtual std::unique_ptr MakeRetryPolicy(const Options &options); + + // Remember all of the last endpoints in case we need to reconnect and retry + std::vector<::asio::ip::tcp::endpoint> last_endpoints_; private: ::asio::io_service * const io_service_; const Options options_; @@ -282,9 +287,6 @@ private: std::atomic_int call_id_; ::asio::deadline_timer retry_timer; - // Remember the last endpoint in case we need to reconnect to retry - ::asio::ip::tcp::endpoint last_endpoint_; - std::mutex engine_state_lock_; }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index 71e3978068..28c75967eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -43,6 +43,12 @@ namespace pbio = ::google::protobuf::io; namespace hdfs { +std::vector> make_endpoint() { + std::vector> result; + result.push_back(asio::ip::basic_endpoint()); + return result; +} + class MockRPCConnection : public MockConnectionBase { public: MockRPCConnection(::asio::io_service &io_service) @@ -61,6 +67,9 @@ class SharedConnectionEngine : public RpcEngine { protected: std::shared_ptr NewConnection() override { + // Stuff in some dummy endpoints so we don't error out + last_endpoints_ = make_endpoint(); + return std::make_shared>(this); } @@ -257,7 +266,7 @@ TEST(RpcEngineTest, TestConnectionFailure) EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); - engine.Connect(asio::ip::basic_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_FALSE(stat.ok()); @@ -285,7 +294,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); - engine.Connect(asio::ip::basic_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_FALSE(stat.ok()); @@ -313,7 +322,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover) .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); - engine.Connect(asio::ip::basic_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_TRUE(stat.ok()); @@ -342,7 +351,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); - engine.Connect(asio::ip::basic_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_TRUE(stat.ok());