diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index cc65ec27bc..32359f9b6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -19,6 +19,7 @@ #include "common/util.h" #include +#include namespace hdfs { @@ -64,4 +65,34 @@ std::string GetRandomClientName() { return ss.str(); } +std::string SafeDisconnect(asio::ip::tcp::socket *sock) { + std::string err; + if(sock && sock->is_open()) { + /** + * Even though we just checked that the socket is open it's possible + * it isn't in a state where it can properly send or receive. If that's + * the case asio will turn the underlying error codes from shutdown() + * and close() into unhelpfully named std::exceptions. Due to the + * relatively innocuous nature of most of these error codes it's better + * to just catch and return a flag so the caller can log failure. + **/ + + try { + sock->shutdown(asio::ip::tcp::socket::shutdown_both); + } catch (const std::exception &e) { + err = std::string("shutdown() threw") + e.what(); + } + + try { + sock->close(); + } catch (const std::exception &e) { + // don't append if shutdown() already failed, first failure is the useful one + if(err.empty()) + err = std::string("close() threw") + e.what(); + } + + } + return err; +} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index 870ce2e7e5..a6616c69b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -21,16 +21,22 @@ #include "hdfspp/status.h" #include +#include +#include #include #include #include #include - +#include namespace hdfs { +// typedefs based on code that's repeated everywhere +typedef std::lock_guard mutex_guard; + + static inline Status ToStatus(const ::asio::error_code &ec) { if (ec) { return Status(ec.value(), ec.message().c_str()); @@ -71,6 +77,11 @@ bool lock_held(T & mutex) { return result; } +// Shutdown and close a socket safely; will check if the socket is open and +// catch anything thrown by asio. +// Returns a string containing error message on failure, otherwise an empty string. +std::string SafeDisconnect(asio::ip::tcp::socket *sock); + } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc index be36fceae3..acc80c9e53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -47,6 +47,7 @@ DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service, void DataNodeConnectionImpl::Connect( std::function dn)> handler) { // Keep the DN from being freed until we're done + mutex_guard state_lock(state_lock_); auto shared_this = shared_from_this(); asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(), [shared_this, handler](const asio::error_code &ec, std::array::iterator it) { @@ -55,7 +56,11 @@ void DataNodeConnectionImpl::Connect( } void DataNodeConnectionImpl::Cancel() { - conn_.reset(); + mutex_guard state_lock(state_lock_); + std::string err = SafeDisconnect(conn_.get()); + if(!err.empty()) { + LOG_WARN(kBlockReader, << "Error disconnecting socket in DataNodeConnectionImpl::Cancel, " << err); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h index 96f26599b0..aa193f3e65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -23,11 +23,10 @@ #include "ClientNamenodeProtocol.pb.h" #include "common/libhdfs_events_impl.h" #include "common/logging.h" +#include "common/util.h" #include "asio.hpp" -#include - namespace hdfs { class DataNodeConnection : public AsyncStream { @@ -43,31 +42,19 @@ public: struct SocketDeleter { inline void operator()(asio::ip::tcp::socket *sock) { - if(sock->is_open()) { - /** - * Even though we just checked that the socket is open it's possible - * it isn't in a state where it can properly send or receive. If that's - * the case asio will turn the underlying error codes from shutdown() - * and close() into unhelpfully named std::exceptions. Due to the - * relatively innocuous nature of most of these error codes it's better - * to just catch, give a warning, and move on with life. - **/ - try { - sock->shutdown(asio::ip::tcp::socket::shutdown_both); - } catch (const std::exception &e) { - LOG_WARN(kBlockReader, << "Error calling socket->shutdown"); - } - try { - sock->close(); - } catch (const std::exception &e) { - LOG_WARN(kBlockReader, << "Error calling socket->close"); - } + // Cancel may have already closed the socket. + std::string err = SafeDisconnect(sock); + if(!err.empty()) { + LOG_WARN(kBlockReader, << "Error disconnecting socket: " << err); } delete sock; } }; class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this{ +private: + // held (briefly) while posting async ops to the asio task queue + std::mutex state_lock_; public: std::unique_ptr conn_; std::array endpoints_; @@ -84,19 +71,21 @@ public: void Cancel() override; void async_read_some(const MutableBuffers &buf, - std::function handler) override { + std::function handler) + override { event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin()); + + mutex_guard state_lock(state_lock_); conn_->async_read_some(buf, handler); }; void async_write_some(const ConstBuffers &buf, - std::function handler) override { - + std::function handler) + override { event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin()); + mutex_guard state_lock(state_lock_); conn_->async_write_some(buf, handler); } };