diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 673455e076..611da21ed3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -26,6 +26,7 @@ #include "hdfspp/fsinfo.h" #include "hdfspp/content_summary.h" #include "hdfspp/uri.h" +#include "hdfspp/locks.h" #include #include diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h new file mode 100644 index 0000000000..3dfeab466f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_LOCKS_H_ +#define COMMON_HDFS_LOCKS_H_ + +#include +#include +#include +#include +#include + +namespace hdfs +{ + +// +// Thrown by LockGuard to indicate that it was unable to acquire a mutex +// what_str should contain info about what caused the failure +// +class LockFailure : public std::runtime_error { + public: + LockFailure(const char *what_str) : std::runtime_error(what_str) {}; + LockFailure(const std::string& what_str) : std::runtime_error(what_str) {}; +}; + +// +// A pluggable mutex type to allow client code to share mutexes it may +// already use to protect certain system resources. Certain shared +// libraries have some procedures that aren't always implemented in a thread +// safe manner. If libhdfs++ and the code linking it depend on the same +// library this provides a mechanism to coordinate safe access. +// +// Interface provided is intended to be similar to std::mutex. If the lock +// can't be aquired it may throw LockFailure from the lock method. If lock +// does fail libhdfs++ is expected fail as cleanly as possible e.g. +// FileSystem::Mkdirs might return a MutexError but a subsequent call may be +// successful. +// +class Mutex { + public: + virtual ~Mutex() {}; + virtual void lock() = 0; + virtual void unlock() = 0; + virtual std::string str() = 0; +}; + +// +// LockGuard works in a similar manner to std::lock_guard: it locks the mutex +// in the constructor and unlocks it in the destructor. +// Failure to acquire the mutex in the constructor will result in throwing a +// LockFailure exception. +// +class LockGuard { + public: + LockGuard(Mutex *m); + ~LockGuard(); + private: + Mutex *_mtx; +}; + +// +// Manage instances of hdfs::Mutex that are intended to be global to the +// process. +// +// LockManager's InitLocks method provides a mechanism for the calling +// application to share its own implementations of hdfs::Mutex. It must be +// called prior to instantiating any FileSystem objects and can only be +// called once. If a lock is not provided a default mutex type wrapping +// std::mutex is used as a default. +// + +class LockManager { + public: + // Initializes with a default set of C++11 style mutexes + static bool InitLocks(Mutex *gssapi); + static Mutex *getGssapiMutex(); + + // Tests only, implementation may no-op on release builds. + // Reset _finalized to false and set all Mutex* members to default values. + static void TEST_reset_manager(); + static Mutex *TEST_get_default_mutex(); + private: + // Used only in tests. + static Mutex *TEST_default_mutex; + // Use to synchronize calls into GSSAPI/Kerberos libs + static Mutex *gssapiMtx; + + // Prevent InitLocks from being called more than once + // Allows all locks to be set a single time atomically + static std::mutex _state_lock; + static bool _finalized; +}; + +} // end namespace hdfs +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h index d0922aec06..718e530c18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h @@ -50,6 +50,7 @@ class Status { static Status PathNotFound(const char *msg); static Status InvalidOffset(const char *msg); static Status PathIsNotDirectory(const char *msg); + static Status MutexError(const char *msg); // success bool ok() const { return code_ == 0; } @@ -79,6 +80,7 @@ class Status { kNotADirectory = static_cast(std::errc::not_a_directory), kFileAlreadyExists = static_cast(std::errc::file_exists), kPathIsNotEmptyDirectory = static_cast(std::errc::directory_not_empty), + kBusy = static_cast(std::errc::device_or_resource_busy), // non-errc codes start at 256 kException = 256, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index b0b721ab54..15e65c10e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc) +add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc) add_library(common $ $) target_link_libraries(common ${LIB_DL}) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc new file mode 100644 index 0000000000..30dcb44060 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/locks.h" + +#include + + +namespace hdfs { + +LockGuard::LockGuard(Mutex *m) : _mtx(m) { + if(!m) { + throw LockFailure("LockGuard passed invalid (null) Mutex pointer"); + } + _mtx->lock(); +} + +LockGuard::~LockGuard() { + if(_mtx) { + _mtx->unlock(); + } +} + + +// Basic mutexes to use as default. Just a wrapper around C++11 std::mutex. +class DefaultMutex : public Mutex { + public: + DefaultMutex() {} + + void lock() override { + // Could throw in here if the implementation couldn't lock for some reason. + _mtx.lock(); + } + + void unlock() override { + _mtx.unlock(); + } + + std::string str() override { + return "DefaultMutex"; + } + private: + std::mutex _mtx; +}; + +DefaultMutex defaultTestMutex; +DefaultMutex defaultGssapiMutex; + +// LockManager static var instantiation +Mutex *LockManager::TEST_default_mutex = &defaultTestMutex; +Mutex *LockManager::gssapiMtx = &defaultGssapiMutex; +std::mutex LockManager::_state_lock; +bool LockManager::_finalized = false; + +bool LockManager::InitLocks(Mutex *gssapi) { + std::lock_guard guard(_state_lock); + + // You get once shot to set this - swapping the locks + // out while in use gets risky. It can still be done by + // using the Mutex as a proxy object if one understands + // the implied risk of doing so. + if(_finalized) + return false; + + gssapiMtx = gssapi; + _finalized = true; + return true; +} + +Mutex *LockManager::getGssapiMutex() { + std::lock_guard guard(_state_lock); + return gssapiMtx; +} + +Mutex *LockManager::TEST_get_default_mutex() { + return TEST_default_mutex; +} + +void LockManager::TEST_reset_manager() { + _finalized = false; + // user still responsible for cleanup + gssapiMtx = &defaultGssapiMutex; +} + +} // end namepace hdfs diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc index 590355381e..4c5c7be216 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -177,7 +177,16 @@ std::string Status::ToString() const { } bool Status::notWorthRetry() const { - return noRetryExceptions.find(code_) != noRetryExceptions.end(); + return noRetryExceptions.find(code_) != noRetryExceptions.end(); +} + +Status Status::MutexError(const char *msg) { + std::string formatted = "MutexError"; + if(msg) { + formatted += ": "; + formatted += msg; + } + return Status(kBusy/*try_lock failure errno*/, msg); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc index 2213f8b79f..9e9319b877 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc @@ -244,6 +244,7 @@ URI URI::parse_from_string(const std::string &str) /////////////////////////////////////////////////////////////////////////////// URI::URI() : _port(-1) {} + URI::Query::Query(const std::string& k, const std::string& v) : key(k), value(v) {} std::string URI::str(bool encoded_output) const diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc index 69b2267a3e..5c96edea5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc @@ -16,6 +16,8 @@ * limitations under the License. */ +#include "hdfspp/locks.h" + #include #include "sasl/sasl.h" #include "sasl/saslutil.h" @@ -31,6 +33,9 @@ namespace hdfs { +static Mutex *getSaslMutex() { + return LockManager::getGssapiMutex(); +} // Forward decls of sasl callback functions typedef int (*sasl_callback_ft)(void); @@ -111,23 +116,30 @@ Status CySaslEngine::SaslError( int rc) { * Cyrus SASL ENGINE */ - CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr) - { - // Create an array of callbacks that embed a pointer to this - // so we can call methods of the engine - per_connection_callbacks_ = { - { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ - { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT - { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm - // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this - { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} - }; - } +CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr) +{ + // Create an array of callbacks that embed a pointer to this + // so we can call methods of the engine + per_connection_callbacks_ = { + { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ + { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT + { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm + // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this + { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} + }; +} +// Cleanup of last resort. Call Finish to allow a safer check on disposal CySaslEngine::~CySaslEngine() { + if (conn_) { + try { + LockGuard saslGuard(getSaslMutex()); sasl_dispose( &conn_); // undo sasl_client_new() + } catch (const LockFailure& e) { + LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << e.what()); + } } } // destructor @@ -146,8 +158,15 @@ Status CySaslEngine::InitCyrusSasl() const char * fqdn = chosen_mech_.serverid.c_str(); const char * proto = chosen_mech_.protocol.c_str(); - rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_); - if (rc != SASL_OK) return SaslError(rc); + try { + LockGuard saslGuard(getSaslMutex()); + rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_); + if (rc != SASL_OK) { + return SaslError(rc); + } + } catch (const LockFailure& e) { + return Status::MutexError("mutex that guards sasl_client_new unable to lock"); + } return Status::OK(); } // cysasl_new() @@ -176,8 +195,15 @@ CySaslEngine::Start() const char * chosen_mech; std::string token; - rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact, - (const char **) &buf, &buflen, &chosen_mech); + try { + LockGuard saslGuard(getSaslMutex()); + rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact, + (const char **) &buf, &buflen, &chosen_mech); + } catch (const LockFailure& e) { + state_ = kFailure; + return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" ); + } + switch (rc) { case SASL_OK: state_ = kSuccess; @@ -192,6 +218,7 @@ CySaslEngine::Start() // Cyrus will free this buffer when the connection is shut down token = std::string( buf, buflen); return std::make_pair( Status::OK(), token); + } // start() method std::pair CySaslEngine::Step(const std::string data) @@ -203,9 +230,15 @@ std::pair CySaslEngine::Step(const std::string data) if (state_ != kWaitingForData) LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_); - int rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact, - (const char **) &output, &outlen); - + int rc = 0; + try { + LockGuard saslGuard(getSaslMutex()); + rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact, + (const char **) &output, &outlen); + } catch (const LockFailure& e) { + state_ = kFailure; + return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" ); + } // right now, state_ == kWaitingForData, // so update state_, to reflect _step()'s result: switch (rc) { @@ -224,8 +257,13 @@ Status CySaslEngine::Finish() LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_); if (conn_ != nullptr) { + try { + LockGuard saslGuard(getSaslMutex()); sasl_dispose( &conn_); conn_ = NULL; + } catch (const LockFailure& e) { + return Status::MutexError("mutex that guards sasl_dispose unable to lock"); + } } return Status::OK(); @@ -234,6 +272,8 @@ Status CySaslEngine::Finish() ////////////////////////////////////////////////// // Internal callbacks, for sasl_init_client(). // // Mostly lifted from cyrus' sample_client.c . // +// Implicitly called in a context that already // +// holds the SASL/GSSAPI lock. // ////////////////////////////////////////////////// static int @@ -388,14 +428,26 @@ const sasl_callback_t per_process_callbacks[] = { CyrusPerProcessData::CyrusPerProcessData() { - int init_rc = sasl_client_init(per_process_callbacks); - init_status_ = make_status(init_rc); + try { + LockGuard saslGuard(getSaslMutex()); + int init_rc = sasl_client_init(per_process_callbacks); + init_status_ = make_status(init_rc); + } catch (const LockFailure& e) { + init_status_ = Status::MutexError("mutex protecting process-wide sasl_client_init unable to lock"); + } } CyrusPerProcessData::~CyrusPerProcessData() { // Undo sasl_client_init()) - sasl_done(); + try { + LockGuard saslGuard(getSaslMutex()); + sasl_done(); + } catch (const LockFailure& e) { + // Not can be done at this point, but the process is most likely shutting down anyway. + LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to lock"); + } + } Status CyrusPerProcessData::Init() @@ -405,6 +457,10 @@ Status CyrusPerProcessData::Init() CyrusPerProcessData & CyrusPerProcessData::GetInstance() { + // Meyer's singleton, thread safe and lazily initialized in C++11 + // + // Must be lazily initialized to allow client code to plug in a GSSAPI mutex + // implementation. static CyrusPerProcessData per_process_data; return per_process_data; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc index 8286bac6bf..7705c81551 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc @@ -16,18 +16,26 @@ * limitations under the License. */ +#include "hdfspp/locks.h" + #include #include #include "sasl_engine.h" #include "gsasl_engine.h" #include "common/logging.h" + namespace hdfs { + /***************************************************************************** * GSASL UTILITY FUNCTIONS */ +static Mutex *getSaslMutex() { + return LockManager::getGssapiMutex(); +} + static Status rc_to_status(int rc) { if (rc == GSASL_OK) { @@ -70,32 +78,45 @@ std::pair base64_encode(const std::string & in) { GSaslEngine::~GSaslEngine() { - if (session_ != nullptr) { + // These should already be called in this->Finish + try { + LockGuard saslGuard(getSaslMutex()); + if (session_ != nullptr) { gsasl_finish(session_); - } + } - if (ctx_ != nullptr) { + if (ctx_ != nullptr) { gsasl_done(ctx_); + } + } catch (const LockFailure& e) { + if(session_ || ctx_) { + LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to dispose of gsasl state: " << e.what()); + } } } Status GSaslEngine::gsasl_new() { - int status = GSASL_OK; + int status = GSASL_OK; - if (ctx_) return Status::OK(); + if (ctx_) return Status::OK(); - status = gsasl_init( & ctx_); + try { + LockGuard saslGuard(getSaslMutex()); + status = gsasl_init( & ctx_); + } catch (const LockFailure& e) { + return Status::MutexError("Mutex that guards gsasl_init unable to lock"); + } - switch ( status) { - case GSASL_OK: - return Status::OK(); - case GSASL_MALLOC_ERROR: - LOG_WARN(kRPC, << "GSaslEngine: Out of memory."); - return Status::Error("SaslEngine: Out of memory."); - default: - LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status); - return Status::Error("SaslEngine: Unexpected error."); - } + switch ( status) { + case GSASL_OK: + return Status::OK(); + case GSASL_MALLOC_ERROR: + LOG_WARN(kRPC, << "GSaslEngine: Out of memory."); + return Status::Error("SaslEngine: Out of memory."); + default: + LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status); + return Status::Error("SaslEngine: Unexpected error."); + } } // gsasl_new() std::pair @@ -107,12 +128,22 @@ GSaslEngine::Start() this->gsasl_new(); /* Create new authentication session. */ - rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_); + try { + LockGuard saslGuard(getSaslMutex()); + rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_); + } catch (const LockFailure& e) { + state_ = kErrorState; + return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), ""); + } if (rc != GSASL_OK) { state_ = kErrorState; return std::make_pair( rc_to_status( rc), std::string("")); } - init_kerberos(); + Status init_status = init_kerberos(); + if(!init_status.ok()) { + state_ = kErrorState; + return std::make_pair(init_status, ""); + } state_ = kWaitingForData; @@ -124,12 +155,17 @@ GSaslEngine::Start() Status GSaslEngine::init_kerberos() { //TODO: check that we have a principal - - gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str()); - gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str()); - gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str()); - return Status::OK(); + try { + LockGuard saslGuard(getSaslMutex()); + // these don't return anything that indicates failure + gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str()); + gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str()); + gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str()); + } catch (const LockFailure& e) { + return Status::MutexError("Mutex that guards gsasl_property_set in GSaslEngine::init_kerberos unable to lock"); } + return Status::OK(); +} std::pair GSaslEngine::Step(const std::string data) { if (state_ != kWaitingForData) @@ -137,8 +173,16 @@ std::pair GSaslEngine::Step(const std::string data) { char * output = NULL; size_t outputSize; - int rc = gsasl_step(session_, data.c_str(), data.size(), &output, + + int rc = 0; + try { + LockGuard saslGuard(getSaslMutex()); + rc = gsasl_step(session_, data.c_str(), data.size(), &output, &outputSize); + } catch (const LockFailure& e) { + state_ = kFailure; + return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), ""); + } if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) { std::string retval(output, output ? outputSize : 0); @@ -166,16 +210,20 @@ Status GSaslEngine::Finish() if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState ) LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_); - if (session_ != nullptr) { + try { + LockGuard saslGuard(getSaslMutex()); + if (session_ != nullptr) { gsasl_finish(session_); session_ = NULL; - } + } - if (ctx_ != nullptr) { + if (ctx_ != nullptr) { gsasl_done(ctx_); ctx_ = nullptr; + } + } catch (const LockFailure& e) { + return Status::MutexError("Mutex that guards sasl state cleanup in GSaslEngine::Finish unable to lock"); } - return Status::OK(); } // finish() method diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc index ad8191b567..0957ea377c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc @@ -38,8 +38,6 @@ namespace hdfs { using namespace hadoop::common; using namespace google::protobuf; -template -using optional = std::experimental::optional; /***** * Threading model: all entry points need to acquire the sasl_lock before accessing diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 395fad5b94..0b4581e104 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -116,6 +116,9 @@ add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc) target_link_libraries(hdfs_ioservice_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(hdfs_ioservice hdfs_ioservice_test) +add_executable(user_lock_test user_lock_test.cc) +target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +add_memcheck_test(user_lock user_lock_test) # # diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc index 78f1a58cb3..97f0afd78d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc @@ -23,7 +23,6 @@ using ::testing::_; using namespace hdfs; - URI expect_uri_throw(const char *uri) { bool threw = false; std::string what_msg; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc new file mode 100644 index 0000000000..6df47b2992 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include +#include +#include +#include +#include + +using namespace hdfs; + +// try_lock will always return false, unlock will always throw because it +// can never be locked. +class CantLockMutex : public Mutex { + public: + void lock() override { + throw LockFailure("This mutex cannot be locked"); + } + void unlock() override { + throw LockFailure("Unlock"); + } + std::string str() override { + return "CantLockMutex"; + } +}; + +TEST(UserLockTest, DefaultMutexBasics) { + Mutex *mtx = LockManager::TEST_get_default_mutex(); + + // lock and unlock twice to make sure unlock works + bool locked = false; + try { + mtx->lock(); + locked = true; + } catch (...) {} + EXPECT_TRUE(locked); + mtx->unlock(); + + locked = false; + try { + mtx->lock(); + locked = true; + } catch (...) {} + EXPECT_TRUE(locked); + mtx->unlock(); + + EXPECT_EQ(mtx->str(), "DefaultMutex"); +} + + +// Make sure lock manager can only be initialized once unless test reset called +TEST(UserLockTest, LockManager) { + std::unique_ptr mtx(new CantLockMutex()); + EXPECT_TRUE(mtx != nullptr); + + // Check the default lock + Mutex *defaultGssapiMtx = LockManager::getGssapiMutex(); + EXPECT_TRUE(defaultGssapiMtx != nullptr); + + // Try a double init. Should not work + bool res = LockManager::InitLocks(mtx.get()); + EXPECT_TRUE(res); + + // Check pointer value + EXPECT_EQ(LockManager::getGssapiMutex(), mtx.get()); + + res = LockManager::InitLocks(mtx.get()); + EXPECT_FALSE(res); + + // Make sure test reset still works + LockManager::TEST_reset_manager(); + res = LockManager::InitLocks(mtx.get()); + EXPECT_TRUE(res); + LockManager::TEST_reset_manager(); + EXPECT_EQ(LockManager::getGssapiMutex(), defaultGssapiMtx); +} + +TEST(UserLockTest, CheckCantLockMutex) { + std::unique_ptr mtx(new CantLockMutex()); + EXPECT_TRUE(mtx != nullptr); + + bool locked = false; + try { + mtx->lock(); + } catch (...) {} + EXPECT_FALSE(locked); + + bool threw_on_unlock = false; + try { + mtx->unlock(); + } catch (const LockFailure& e) { + threw_on_unlock = true; + } + EXPECT_TRUE(threw_on_unlock); + + EXPECT_EQ("CantLockMutex", mtx->str()); +} + +TEST(UserLockTest, LockGuardBasics) { + Mutex *goodMtx = LockManager::TEST_get_default_mutex(); + CantLockMutex badMtx; + + // lock/unlock a few times to increase chances of UB if lock is misused + for(int i=0;i<10;i++) { + bool caught_exception = false; + try { + LockGuard guard(goodMtx); + // now have a scoped lock + } catch (const LockFailure& e) { + caught_exception = true; + } + EXPECT_FALSE(caught_exception); + } + + // still do a few times, but expect it to blow up each time + for(int i=0;i<10;i++) { + bool caught_exception = false; + try { + LockGuard guard(&badMtx); + // now have a scoped lock + } catch (const LockFailure& e) { + caught_exception = true; + } + EXPECT_TRUE(caught_exception); + } + +} + +struct Incrementer { + int64_t& _val; + int64_t _iters; + Mutex *_mtx; + Incrementer(int64_t &val, int64_t iters, Mutex *m) + : _val(val), _iters(iters), _mtx(m) {} + void operator()(){ + for(int64_t i=0; i<_iters; i++) { + LockGuard valguard(_mtx); + _val += 1; + } + } +}; + +struct Decrementer { + int64_t& _val; + int64_t _iters; + Mutex *_mtx; + Decrementer(int64_t &val, int64_t iters, Mutex *m) + : _val(val), _iters(iters), _mtx(m) {} + void operator()(){ + for(int64_t i=0; i<_iters; i++) { + LockGuard valguard(_mtx); + _val -= 1; + } + } +}; + +TEST(UserLockTest, LockGuardConcurrency) { + Mutex *mtx = LockManager::TEST_get_default_mutex(); + + // Prove that these actually mutate the value + int64_t test_value = 0; + Incrementer inc(test_value, 1000, mtx); + inc(); + EXPECT_EQ(test_value, 1000); + + Decrementer dec(test_value, 1000, mtx); + dec(); + EXPECT_EQ(test_value, 0); + + std::vector workers; + std::vector incrementers; + std::vector decrementors; + + const int delta = 1024 * 1024; + const int threads = 2 * 6; + EXPECT_EQ(threads % 2, 0); + + // a bunch of threads race to increment and decrement the value + // if all goes well the operations balance out and the value is unchanged + for(int i=0; i < threads; i++) { + if(i%2 == 0) { + incrementers.emplace_back(test_value, delta, mtx); + workers.emplace_back(incrementers.back()); + } else { + decrementors.emplace_back(test_value, delta, mtx); + workers.emplace_back(decrementors.back()); + } + } + + // join, everything should balance to 0 + for(std::thread& thread : workers) { + thread.join(); + } + EXPECT_EQ(test_value, 0); +} + + +int main(int argc, char *argv[]) { + + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int res = RUN_ALL_TESTS(); + return res; +}