HDFS-10526: libhdfs++: Add connect timeouts to async_connect calls. Contributed by Bob Hansen.
This commit is contained in:
parent
de6fce7817
commit
2616fe2025
@ -33,6 +33,13 @@ struct Options {
|
|||||||
int rpc_timeout;
|
int rpc_timeout;
|
||||||
static const int kDefaultRpcTimeout = 30000;
|
static const int kDefaultRpcTimeout = 30000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time to wait for an RPC connection before failing
|
||||||
|
* Default: 30000
|
||||||
|
**/
|
||||||
|
int rpc_connect_timeout;
|
||||||
|
static const int kDefaultRpcConnectTimeout = 30000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum number of retries for RPC operations
|
* Maximum number of retries for RPC operations
|
||||||
**/
|
**/
|
||||||
|
@ -45,11 +45,12 @@ Options HdfsConfiguration::GetOptions() {
|
|||||||
Options result;
|
Options result;
|
||||||
|
|
||||||
OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey));
|
OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey));
|
||||||
|
OptionalSet(result.rpc_connect_timeout, GetInt(kIpcClientConnectTimeoutKey));
|
||||||
OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey));
|
OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey));
|
||||||
OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey));
|
OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey));
|
||||||
OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
|
OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
|
||||||
|
|
||||||
optional<std::string> authentication_value = Get(kHadoopSecurityAuthentication);
|
optional<std::string> authentication_value = Get(kHadoopSecurityAuthenticationKey);
|
||||||
if (authentication_value ) {
|
if (authentication_value ) {
|
||||||
std::string fixed_case_value = fixCase(authentication_value.value());
|
std::string fixed_case_value = fixCase(authentication_value.value());
|
||||||
if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos))
|
if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos))
|
||||||
|
@ -39,9 +39,10 @@ class HdfsConfiguration : public Configuration {
|
|||||||
// Keys to look for in the configuration file
|
// Keys to look for in the configuration file
|
||||||
static constexpr const char * kFsDefaultFsKey = "fs.defaultFS";
|
static constexpr const char * kFsDefaultFsKey = "fs.defaultFS";
|
||||||
static constexpr const char * kDfsClientSocketTimeoutKey = "dfs.client.socket-timeout";
|
static constexpr const char * kDfsClientSocketTimeoutKey = "dfs.client.socket-timeout";
|
||||||
|
static constexpr const char * kIpcClientConnectTimeoutKey = "ipc.client.connect.timeout";
|
||||||
static constexpr const char * kIpcClientConnectMaxRetriesKey = "ipc.client.connect.max.retries";
|
static constexpr const char * kIpcClientConnectMaxRetriesKey = "ipc.client.connect.max.retries";
|
||||||
static constexpr const char * kIpcClientConnectRetryIntervalKey = "ipc.client.connect.retry.interval";
|
static constexpr const char * kIpcClientConnectRetryIntervalKey = "ipc.client.connect.retry.interval";
|
||||||
static constexpr const char * kHadoopSecurityAuthentication = "hadoop.security.authentication";
|
static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication";
|
||||||
static constexpr const char * kHadoopSecurityAuthentication_simple = "simple";
|
static constexpr const char * kHadoopSecurityAuthentication_simple = "simple";
|
||||||
static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos";
|
static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos";
|
||||||
|
|
||||||
|
@ -136,6 +136,11 @@ LogMessage& LogMessage::operator<<(const std::string& str) {
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LogMessage& LogMessage::operator<<(const ::asio::ip::tcp::endpoint& endpoint) {
|
||||||
|
msg_buffer_ << endpoint;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
LogMessage& LogMessage::operator<<(const char *str) {
|
LogMessage& LogMessage::operator<<(const char *str) {
|
||||||
if(str)
|
if(str)
|
||||||
msg_buffer_ << str;
|
msg_buffer_ << str;
|
||||||
@ -213,4 +218,3 @@ const char * LogMessage::component_string() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,8 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
#include <asio/ip/tcp.hpp>
|
||||||
|
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -177,6 +179,7 @@ class LogMessage {
|
|||||||
LogMessage& operator<<(const std::string*);
|
LogMessage& operator<<(const std::string*);
|
||||||
LogMessage& operator<<(const std::string&);
|
LogMessage& operator<<(const std::string&);
|
||||||
|
|
||||||
|
LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint);
|
||||||
|
|
||||||
//convert to a string "true"/"false"
|
//convert to a string "true"/"false"
|
||||||
LogMessage& operator<<(bool);
|
LogMessage& operator<<(bool);
|
||||||
|
@ -27,7 +27,8 @@ const int Options::kDefaultMaxRpcRetries;
|
|||||||
const int Options::kDefaultRpcRetryDelayMs;
|
const int Options::kDefaultRpcRetryDelayMs;
|
||||||
const unsigned int Options::kDefaultHostExclusionDuration;
|
const unsigned int Options::kDefaultHostExclusionDuration;
|
||||||
|
|
||||||
Options::Options() : rpc_timeout(kDefaultRpcTimeout), max_rpc_retries(kDefaultMaxRpcRetries),
|
Options::Options() : rpc_timeout(kDefaultRpcTimeout), rpc_connect_timeout(kDefaultRpcConnectTimeout),
|
||||||
|
max_rpc_retries(kDefaultMaxRpcRetries),
|
||||||
rpc_retry_delay_ms(kDefaultRpcRetryDelayMs),
|
rpc_retry_delay_ms(kDefaultRpcRetryDelayMs),
|
||||||
host_exclusion_duration(kDefaultHostExclusionDuration),
|
host_exclusion_duration(kDefaultHostExclusionDuration),
|
||||||
defaultFS(),
|
defaultFS(),
|
||||||
|
@ -183,7 +183,7 @@ void RpcConnection::HandshakeComplete(const Status &s) {
|
|||||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
if (connected_ == kConnecting) {
|
if (connected_ == kHandshaking) {
|
||||||
auto shared_this = shared_from_this();
|
auto shared_this = shared_from_this();
|
||||||
|
|
||||||
connected_ = kAuthenticating;
|
connected_ = kAuthenticating;
|
||||||
@ -407,7 +407,7 @@ void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> >
|
|||||||
else
|
else
|
||||||
auth_requests_.push_back(r);
|
auth_requests_.push_back(r);
|
||||||
}
|
}
|
||||||
if (connected_ == kConnected || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking
|
if (connected_ == kConnected || connected_ == kHandshaking || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking
|
||||||
FlushPendingRequests();
|
FlushPendingRequests();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -494,6 +494,7 @@ std::string RpcConnection::ToString(ConnectedState connected) {
|
|||||||
switch(connected) {
|
switch(connected) {
|
||||||
case kNotYetConnected: return "NotYetConnected";
|
case kNotYetConnected: return "NotYetConnected";
|
||||||
case kConnecting: return "Connecting";
|
case kConnecting: return "Connecting";
|
||||||
|
case kHandshaking: return "Handshaking";
|
||||||
case kAuthenticating: return "Authenticating";
|
case kAuthenticating: return "Authenticating";
|
||||||
case kConnected: return "Connected";
|
case kConnected: return "Connected";
|
||||||
case kDisconnected: return "Disconnected";
|
case kDisconnected: return "Disconnected";
|
||||||
|
@ -59,17 +59,20 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
const Options options_;
|
const Options options_;
|
||||||
|
::asio::ip::tcp::endpoint current_endpoint_;
|
||||||
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
|
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
|
||||||
NextLayer next_layer_;
|
NextLayer next_layer_;
|
||||||
|
::asio::deadline_timer connect_timer_;
|
||||||
|
|
||||||
void ConnectComplete(const ::asio::error_code &ec);
|
void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
|
||||||
};
|
};
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
|
RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
|
||||||
: RpcConnection(engine),
|
: RpcConnection(engine),
|
||||||
options_(engine->options()),
|
options_(engine->options()),
|
||||||
next_layer_(engine->io_service()) {
|
next_layer_(engine->io_service()),
|
||||||
|
connect_timer_(engine->io_service()) {
|
||||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,20 +132,43 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
|
|||||||
additional_endpoints_ = server;
|
additional_endpoints_ = server;
|
||||||
::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
|
::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
|
||||||
additional_endpoints_.erase(additional_endpoints_.begin());
|
additional_endpoints_.erase(additional_endpoints_.begin());
|
||||||
|
current_endpoint_ = first_endpoint;
|
||||||
|
|
||||||
auto shared_this = shared_from_this();
|
auto shared_this = shared_from_this();
|
||||||
next_layer_.async_connect(first_endpoint, [shared_this, this](const ::asio::error_code &ec) {
|
next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
||||||
ConnectComplete(ec);
|
ConnectComplete(ec, first_endpoint);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prompt the timer to timeout
|
||||||
|
auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
|
||||||
|
connect_timer_.expires_from_now(
|
||||||
|
std::chrono::milliseconds(options_.rpc_connect_timeout));
|
||||||
|
connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
||||||
|
if (ec)
|
||||||
|
ConnectComplete(ec, first_endpoint);
|
||||||
|
else
|
||||||
|
ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec) {
|
void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
|
||||||
auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
|
auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
|
||||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||||
|
connect_timer_.cancel();
|
||||||
|
|
||||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
|
||||||
|
|
||||||
|
// Could be an old async connect returning a result after we've moved on
|
||||||
|
if (remote != current_endpoint_) {
|
||||||
|
LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (connected_ != kConnecting) {
|
||||||
|
LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Status status = ToStatus(ec);
|
Status status = ToStatus(ec);
|
||||||
if(event_handlers_) {
|
if(event_handlers_) {
|
||||||
auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
|
auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
|
||||||
@ -159,6 +185,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
|
|||||||
HandshakeComplete(s);
|
HandshakeComplete(s);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
|
||||||
std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_));
|
std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_));
|
||||||
if(!err.empty()) {
|
if(!err.empty()) {
|
||||||
LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
|
LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
|
||||||
@ -169,9 +196,18 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
|
|||||||
// hit one
|
// hit one
|
||||||
::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
|
::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
|
||||||
additional_endpoints_.erase(additional_endpoints_.begin());
|
additional_endpoints_.erase(additional_endpoints_.begin());
|
||||||
|
current_endpoint_ = next_endpoint;
|
||||||
|
|
||||||
next_layer_.async_connect(next_endpoint, [shared_this, this](const ::asio::error_code &ec) {
|
next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
||||||
ConnectComplete(ec);
|
ConnectComplete(ec, next_endpoint);
|
||||||
|
});
|
||||||
|
connect_timer_.expires_from_now(
|
||||||
|
std::chrono::milliseconds(options_.rpc_connect_timeout));
|
||||||
|
connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
||||||
|
if (ec)
|
||||||
|
ConnectComplete(ec, next_endpoint);
|
||||||
|
else
|
||||||
|
ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
CommsError(status);
|
CommsError(status);
|
||||||
@ -184,6 +220,7 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
|
|||||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||||
|
|
||||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
|
||||||
|
connected_ = kHandshaking;
|
||||||
|
|
||||||
auto shared_this = shared_from_this();
|
auto shared_this = shared_from_this();
|
||||||
auto handshake_packet = PrepareHandshakePacket();
|
auto handshake_packet = PrepareHandshakePacket();
|
||||||
@ -250,6 +287,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
|
|||||||
return;
|
return;
|
||||||
case kConnecting:
|
case kConnecting:
|
||||||
return;
|
return;
|
||||||
|
case kHandshaking:
|
||||||
|
return;
|
||||||
case kAuthenticating:
|
case kAuthenticating:
|
||||||
if (auth_requests_.empty()) {
|
if (auth_requests_.empty()) {
|
||||||
return;
|
return;
|
||||||
@ -379,7 +418,7 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
|
|||||||
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
|
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
|
||||||
|
|
||||||
request_over_the_wire_.reset();
|
request_over_the_wire_.reset();
|
||||||
if (connected_ == kConnecting || connected_ == kAuthenticating || connected_ == kConnected) {
|
if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
|
||||||
// Don't print out errors, we were expecting a disconnect here
|
// Don't print out errors, we were expecting a disconnect here
|
||||||
SafeDisconnect(get_asio_socket_ptr(&next_layer_));
|
SafeDisconnect(get_asio_socket_ptr(&next_layer_));
|
||||||
}
|
}
|
||||||
|
@ -197,7 +197,7 @@ void RpcEngine::RpcCommsError(
|
|||||||
if (head_action->delayMillis > 0) {
|
if (head_action->delayMillis > 0) {
|
||||||
auto weak_conn = std::weak_ptr<RpcConnection>(conn_);
|
auto weak_conn = std::weak_ptr<RpcConnection>(conn_);
|
||||||
retry_timer.expires_from_now(
|
retry_timer.expires_from_now(
|
||||||
std::chrono::milliseconds(options_.rpc_retry_delay_ms));
|
std::chrono::milliseconds(head_action->delayMillis));
|
||||||
retry_timer.async_wait([this, weak_conn](asio::error_code ec) {
|
retry_timer.async_wait([this, weak_conn](asio::error_code ec) {
|
||||||
auto strong_conn = weak_conn.lock();
|
auto strong_conn = weak_conn.lock();
|
||||||
if ( (!ec) && (strong_conn) ) {
|
if ( (!ec) && (strong_conn) ) {
|
||||||
|
@ -207,6 +207,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
|
|||||||
enum ConnectedState {
|
enum ConnectedState {
|
||||||
kNotYetConnected,
|
kNotYetConnected,
|
||||||
kConnecting,
|
kConnecting,
|
||||||
|
kHandshaking,
|
||||||
kAuthenticating,
|
kAuthenticating,
|
||||||
kConnected,
|
kConnected,
|
||||||
kDisconnected
|
kDisconnected
|
||||||
|
@ -41,17 +41,25 @@ TEST(HdfsConfigurationTest, TestSetOptions)
|
|||||||
// Completely empty stream
|
// Completely empty stream
|
||||||
{
|
{
|
||||||
std::stringstream stream;
|
std::stringstream stream;
|
||||||
simpleConfigStream(stream, HdfsConfiguration::kDfsClientSocketTimeoutKey, 100,
|
simpleConfigStream(stream,
|
||||||
|
HdfsConfiguration::kFsDefaultFsKey, "/FDFK",
|
||||||
|
HdfsConfiguration::kDfsClientSocketTimeoutKey, 100,
|
||||||
HdfsConfiguration::kIpcClientConnectMaxRetriesKey, 101,
|
HdfsConfiguration::kIpcClientConnectMaxRetriesKey, 101,
|
||||||
HdfsConfiguration::kIpcClientConnectRetryIntervalKey, 102);
|
HdfsConfiguration::kIpcClientConnectRetryIntervalKey, 102,
|
||||||
|
HdfsConfiguration::kIpcClientConnectTimeoutKey, 103,
|
||||||
|
HdfsConfiguration::kHadoopSecurityAuthenticationKey, HdfsConfiguration::kHadoopSecurityAuthentication_kerberos
|
||||||
|
);
|
||||||
|
|
||||||
optional<HdfsConfiguration> config = ConfigurationLoader().Load<HdfsConfiguration>(stream.str());
|
optional<HdfsConfiguration> config = ConfigurationLoader().Load<HdfsConfiguration>(stream.str());
|
||||||
EXPECT_TRUE(config && "Read stream");
|
EXPECT_TRUE(config && "Read stream");
|
||||||
Options options = config->GetOptions();
|
Options options = config->GetOptions();
|
||||||
|
|
||||||
|
EXPECT_EQ("/FDFK", options.defaultFS.str());
|
||||||
EXPECT_EQ(100, options.rpc_timeout);
|
EXPECT_EQ(100, options.rpc_timeout);
|
||||||
EXPECT_EQ(101, options.max_rpc_retries);
|
EXPECT_EQ(101, options.max_rpc_retries);
|
||||||
EXPECT_EQ(102, options.rpc_retry_delay_ms);
|
EXPECT_EQ(102, options.rpc_retry_delay_ms);
|
||||||
|
EXPECT_EQ(103, options.rpc_connect_timeout);
|
||||||
|
EXPECT_EQ(Options::kKerberos, options.authentication);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user