HDFS-12134: libhdfs++: Add a synchronization interface for the GSSAPI. Contributed by James Clampffer.

This commit is contained in:
James Clampffer 2017-08-07 13:04:50 -04:00
parent 033433bce7
commit 22ea06a3dd
13 changed files with 607 additions and 55 deletions

View File

@ -26,6 +26,7 @@
#include "hdfspp/fsinfo.h" #include "hdfspp/fsinfo.h"
#include "hdfspp/content_summary.h" #include "hdfspp/content_summary.h"
#include "hdfspp/uri.h" #include "hdfspp/uri.h"
#include "hdfspp/locks.h"
#include <functional> #include <functional>
#include <memory> #include <memory>

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef COMMON_HDFS_LOCKS_H_
#define COMMON_HDFS_LOCKS_H_
#include <stdexcept>
#include <string>
#include <atomic>
#include <mutex>
#include <memory>
namespace hdfs
{
//
// Thrown by LockGuard to indicate that it was unable to acquire a mutex
// what_str should contain info about what caused the failure
//
class LockFailure : public std::runtime_error {
public:
LockFailure(const char *what_str) : std::runtime_error(what_str) {};
LockFailure(const std::string& what_str) : std::runtime_error(what_str) {};
};
//
// A pluggable mutex type to allow client code to share mutexes it may
// already use to protect certain system resources. Certain shared
// libraries have some procedures that aren't always implemented in a thread
// safe manner. If libhdfs++ and the code linking it depend on the same
// library this provides a mechanism to coordinate safe access.
//
// Interface provided is intended to be similar to std::mutex. If the lock
// can't be aquired it may throw LockFailure from the lock method. If lock
// does fail libhdfs++ is expected fail as cleanly as possible e.g.
// FileSystem::Mkdirs might return a MutexError but a subsequent call may be
// successful.
//
class Mutex {
public:
virtual ~Mutex() {};
virtual void lock() = 0;
virtual void unlock() = 0;
virtual std::string str() = 0;
};
//
// LockGuard works in a similar manner to std::lock_guard: it locks the mutex
// in the constructor and unlocks it in the destructor.
// Failure to acquire the mutex in the constructor will result in throwing a
// LockFailure exception.
//
class LockGuard {
public:
LockGuard(Mutex *m);
~LockGuard();
private:
Mutex *_mtx;
};
//
// Manage instances of hdfs::Mutex that are intended to be global to the
// process.
//
// LockManager's InitLocks method provides a mechanism for the calling
// application to share its own implementations of hdfs::Mutex. It must be
// called prior to instantiating any FileSystem objects and can only be
// called once. If a lock is not provided a default mutex type wrapping
// std::mutex is used as a default.
//
class LockManager {
public:
// Initializes with a default set of C++11 style mutexes
static bool InitLocks(Mutex *gssapi);
static Mutex *getGssapiMutex();
// Tests only, implementation may no-op on release builds.
// Reset _finalized to false and set all Mutex* members to default values.
static void TEST_reset_manager();
static Mutex *TEST_get_default_mutex();
private:
// Used only in tests.
static Mutex *TEST_default_mutex;
// Use to synchronize calls into GSSAPI/Kerberos libs
static Mutex *gssapiMtx;
// Prevent InitLocks from being called more than once
// Allows all locks to be set a single time atomically
static std::mutex _state_lock;
static bool _finalized;
};
} // end namespace hdfs
#endif

View File

@ -50,6 +50,7 @@ class Status {
static Status PathNotFound(const char *msg); static Status PathNotFound(const char *msg);
static Status InvalidOffset(const char *msg); static Status InvalidOffset(const char *msg);
static Status PathIsNotDirectory(const char *msg); static Status PathIsNotDirectory(const char *msg);
static Status MutexError(const char *msg);
// success // success
bool ok() const { return code_ == 0; } bool ok() const { return code_ == 0; }
@ -79,6 +80,7 @@ class Status {
kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory), kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory),
kFileAlreadyExists = static_cast<unsigned>(std::errc::file_exists), kFileAlreadyExists = static_cast<unsigned>(std::errc::file_exists),
kPathIsNotEmptyDirectory = static_cast<unsigned>(std::errc::directory_not_empty), kPathIsNotEmptyDirectory = static_cast<unsigned>(std::errc::directory_not_empty),
kBusy = static_cast<unsigned>(std::errc::device_or_resource_busy),
// non-errc codes start at 256 // non-errc codes start at 256
kException = 256, kException = 256,

View File

@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl) set(LIB_DL dl)
endif() endif()
add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc) add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>) add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL}) target_link_libraries(common ${LIB_DL})

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "hdfspp/locks.h"
#include <mutex>
namespace hdfs {
LockGuard::LockGuard(Mutex *m) : _mtx(m) {
if(!m) {
throw LockFailure("LockGuard passed invalid (null) Mutex pointer");
}
_mtx->lock();
}
LockGuard::~LockGuard() {
if(_mtx) {
_mtx->unlock();
}
}
// Basic mutexes to use as default. Just a wrapper around C++11 std::mutex.
class DefaultMutex : public Mutex {
public:
DefaultMutex() {}
void lock() override {
// Could throw in here if the implementation couldn't lock for some reason.
_mtx.lock();
}
void unlock() override {
_mtx.unlock();
}
std::string str() override {
return "DefaultMutex";
}
private:
std::mutex _mtx;
};
DefaultMutex defaultTestMutex;
DefaultMutex defaultGssapiMutex;
// LockManager static var instantiation
Mutex *LockManager::TEST_default_mutex = &defaultTestMutex;
Mutex *LockManager::gssapiMtx = &defaultGssapiMutex;
std::mutex LockManager::_state_lock;
bool LockManager::_finalized = false;
bool LockManager::InitLocks(Mutex *gssapi) {
std::lock_guard<std::mutex> guard(_state_lock);
// You get once shot to set this - swapping the locks
// out while in use gets risky. It can still be done by
// using the Mutex as a proxy object if one understands
// the implied risk of doing so.
if(_finalized)
return false;
gssapiMtx = gssapi;
_finalized = true;
return true;
}
Mutex *LockManager::getGssapiMutex() {
std::lock_guard<std::mutex> guard(_state_lock);
return gssapiMtx;
}
Mutex *LockManager::TEST_get_default_mutex() {
return TEST_default_mutex;
}
void LockManager::TEST_reset_manager() {
_finalized = false;
// user still responsible for cleanup
gssapiMtx = &defaultGssapiMutex;
}
} // end namepace hdfs

View File

@ -177,7 +177,16 @@ std::string Status::ToString() const {
} }
bool Status::notWorthRetry() const { bool Status::notWorthRetry() const {
return noRetryExceptions.find(code_) != noRetryExceptions.end(); return noRetryExceptions.find(code_) != noRetryExceptions.end();
}
Status Status::MutexError(const char *msg) {
std::string formatted = "MutexError";
if(msg) {
formatted += ": ";
formatted += msg;
}
return Status(kBusy/*try_lock failure errno*/, msg);
} }
} }

View File

@ -244,6 +244,7 @@ URI URI::parse_from_string(const std::string &str)
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
URI::URI() : _port(-1) {} URI::URI() : _port(-1) {}
URI::Query::Query(const std::string& k, const std::string& v) : key(k), value(v) {} URI::Query::Query(const std::string& k, const std::string& v) : key(k), value(v) {}
std::string URI::str(bool encoded_output) const std::string URI::str(bool encoded_output) const

View File

@ -16,6 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */
#include "hdfspp/locks.h"
#include <sys/types.h> #include <sys/types.h>
#include "sasl/sasl.h" #include "sasl/sasl.h"
#include "sasl/saslutil.h" #include "sasl/saslutil.h"
@ -31,6 +33,9 @@
namespace hdfs { namespace hdfs {
static Mutex *getSaslMutex() {
return LockManager::getGssapiMutex();
}
// Forward decls of sasl callback functions // Forward decls of sasl callback functions
typedef int (*sasl_callback_ft)(void); typedef int (*sasl_callback_ft)(void);
@ -111,23 +116,30 @@ Status CySaslEngine::SaslError( int rc) {
* Cyrus SASL ENGINE * Cyrus SASL ENGINE
*/ */
CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr) CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr)
{ {
// Create an array of callbacks that embed a pointer to this // Create an array of callbacks that embed a pointer to this
// so we can call methods of the engine // so we can call methods of the engine
per_connection_callbacks_ = { per_connection_callbacks_ = {
{ SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ
{ SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT
{ SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm
// { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this
{ SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
}; };
} }
// Cleanup of last resort. Call Finish to allow a safer check on disposal
CySaslEngine::~CySaslEngine() CySaslEngine::~CySaslEngine()
{ {
if (conn_) { if (conn_) {
try {
LockGuard saslGuard(getSaslMutex());
sasl_dispose( &conn_); // undo sasl_client_new() sasl_dispose( &conn_); // undo sasl_client_new()
} catch (const LockFailure& e) {
LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << e.what());
}
} }
} // destructor } // destructor
@ -146,8 +158,15 @@ Status CySaslEngine::InitCyrusSasl()
const char * fqdn = chosen_mech_.serverid.c_str(); const char * fqdn = chosen_mech_.serverid.c_str();
const char * proto = chosen_mech_.protocol.c_str(); const char * proto = chosen_mech_.protocol.c_str();
rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_); try {
if (rc != SASL_OK) return SaslError(rc); LockGuard saslGuard(getSaslMutex());
rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_);
if (rc != SASL_OK) {
return SaslError(rc);
}
} catch (const LockFailure& e) {
return Status::MutexError("mutex that guards sasl_client_new unable to lock");
}
return Status::OK(); return Status::OK();
} // cysasl_new() } // cysasl_new()
@ -176,8 +195,15 @@ CySaslEngine::Start()
const char * chosen_mech; const char * chosen_mech;
std::string token; std::string token;
rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact, try {
(const char **) &buf, &buflen, &chosen_mech); LockGuard saslGuard(getSaslMutex());
rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact,
(const char **) &buf, &buflen, &chosen_mech);
} catch (const LockFailure& e) {
state_ = kFailure;
return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" );
}
switch (rc) { switch (rc) {
case SASL_OK: state_ = kSuccess; case SASL_OK: state_ = kSuccess;
@ -192,6 +218,7 @@ CySaslEngine::Start()
// Cyrus will free this buffer when the connection is shut down // Cyrus will free this buffer when the connection is shut down
token = std::string( buf, buflen); token = std::string( buf, buflen);
return std::make_pair( Status::OK(), token); return std::make_pair( Status::OK(), token);
} // start() method } // start() method
std::pair<Status, std::string> CySaslEngine::Step(const std::string data) std::pair<Status, std::string> CySaslEngine::Step(const std::string data)
@ -203,9 +230,15 @@ std::pair<Status, std::string> CySaslEngine::Step(const std::string data)
if (state_ != kWaitingForData) if (state_ != kWaitingForData)
LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_); LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_);
int rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact, int rc = 0;
(const char **) &output, &outlen); try {
LockGuard saslGuard(getSaslMutex());
rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact,
(const char **) &output, &outlen);
} catch (const LockFailure& e) {
state_ = kFailure;
return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" );
}
// right now, state_ == kWaitingForData, // right now, state_ == kWaitingForData,
// so update state_, to reflect _step()'s result: // so update state_, to reflect _step()'s result:
switch (rc) { switch (rc) {
@ -224,8 +257,13 @@ Status CySaslEngine::Finish()
LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_); LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_);
if (conn_ != nullptr) { if (conn_ != nullptr) {
try {
LockGuard saslGuard(getSaslMutex());
sasl_dispose( &conn_); sasl_dispose( &conn_);
conn_ = NULL; conn_ = NULL;
} catch (const LockFailure& e) {
return Status::MutexError("mutex that guards sasl_dispose unable to lock");
}
} }
return Status::OK(); return Status::OK();
@ -234,6 +272,8 @@ Status CySaslEngine::Finish()
////////////////////////////////////////////////// //////////////////////////////////////////////////
// Internal callbacks, for sasl_init_client(). // // Internal callbacks, for sasl_init_client(). //
// Mostly lifted from cyrus' sample_client.c . // // Mostly lifted from cyrus' sample_client.c . //
// Implicitly called in a context that already //
// holds the SASL/GSSAPI lock. //
////////////////////////////////////////////////// //////////////////////////////////////////////////
static int static int
@ -388,14 +428,26 @@ const sasl_callback_t per_process_callbacks[] = {
CyrusPerProcessData::CyrusPerProcessData() CyrusPerProcessData::CyrusPerProcessData()
{ {
int init_rc = sasl_client_init(per_process_callbacks); try {
init_status_ = make_status(init_rc); LockGuard saslGuard(getSaslMutex());
int init_rc = sasl_client_init(per_process_callbacks);
init_status_ = make_status(init_rc);
} catch (const LockFailure& e) {
init_status_ = Status::MutexError("mutex protecting process-wide sasl_client_init unable to lock");
}
} }
CyrusPerProcessData::~CyrusPerProcessData() CyrusPerProcessData::~CyrusPerProcessData()
{ {
// Undo sasl_client_init()) // Undo sasl_client_init())
sasl_done(); try {
LockGuard saslGuard(getSaslMutex());
sasl_done();
} catch (const LockFailure& e) {
// Not can be done at this point, but the process is most likely shutting down anyway.
LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to lock");
}
} }
Status CyrusPerProcessData::Init() Status CyrusPerProcessData::Init()
@ -405,6 +457,10 @@ Status CyrusPerProcessData::Init()
CyrusPerProcessData & CyrusPerProcessData::GetInstance() CyrusPerProcessData & CyrusPerProcessData::GetInstance()
{ {
// Meyer's singleton, thread safe and lazily initialized in C++11
//
// Must be lazily initialized to allow client code to plug in a GSSAPI mutex
// implementation.
static CyrusPerProcessData per_process_data; static CyrusPerProcessData per_process_data;
return per_process_data; return per_process_data;
} }

View File

@ -16,18 +16,26 @@
* limitations under the License. * limitations under the License.
*/ */
#include "hdfspp/locks.h"
#include <sstream> #include <sstream>
#include <gsasl.h> #include <gsasl.h>
#include "sasl_engine.h" #include "sasl_engine.h"
#include "gsasl_engine.h" #include "gsasl_engine.h"
#include "common/logging.h" #include "common/logging.h"
namespace hdfs { namespace hdfs {
/***************************************************************************** /*****************************************************************************
* GSASL UTILITY FUNCTIONS * GSASL UTILITY FUNCTIONS
*/ */
static Mutex *getSaslMutex() {
return LockManager::getGssapiMutex();
}
static Status rc_to_status(int rc) static Status rc_to_status(int rc)
{ {
if (rc == GSASL_OK) { if (rc == GSASL_OK) {
@ -70,32 +78,45 @@ std::pair<Status, std::string> base64_encode(const std::string & in) {
GSaslEngine::~GSaslEngine() GSaslEngine::~GSaslEngine()
{ {
if (session_ != nullptr) { // These should already be called in this->Finish
try {
LockGuard saslGuard(getSaslMutex());
if (session_ != nullptr) {
gsasl_finish(session_); gsasl_finish(session_);
} }
if (ctx_ != nullptr) { if (ctx_ != nullptr) {
gsasl_done(ctx_); gsasl_done(ctx_);
}
} catch (const LockFailure& e) {
if(session_ || ctx_) {
LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to dispose of gsasl state: " << e.what());
}
} }
} }
Status GSaslEngine::gsasl_new() { Status GSaslEngine::gsasl_new() {
int status = GSASL_OK; int status = GSASL_OK;
if (ctx_) return Status::OK(); if (ctx_) return Status::OK();
status = gsasl_init( & ctx_); try {
LockGuard saslGuard(getSaslMutex());
status = gsasl_init( & ctx_);
} catch (const LockFailure& e) {
return Status::MutexError("Mutex that guards gsasl_init unable to lock");
}
switch ( status) { switch ( status) {
case GSASL_OK: case GSASL_OK:
return Status::OK(); return Status::OK();
case GSASL_MALLOC_ERROR: case GSASL_MALLOC_ERROR:
LOG_WARN(kRPC, << "GSaslEngine: Out of memory."); LOG_WARN(kRPC, << "GSaslEngine: Out of memory.");
return Status::Error("SaslEngine: Out of memory."); return Status::Error("SaslEngine: Out of memory.");
default: default:
LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status); LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status);
return Status::Error("SaslEngine: Unexpected error."); return Status::Error("SaslEngine: Unexpected error.");
} }
} // gsasl_new() } // gsasl_new()
std::pair<Status, std::string> std::pair<Status, std::string>
@ -107,12 +128,22 @@ GSaslEngine::Start()
this->gsasl_new(); this->gsasl_new();
/* Create new authentication session. */ /* Create new authentication session. */
rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_); try {
LockGuard saslGuard(getSaslMutex());
rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_);
} catch (const LockFailure& e) {
state_ = kErrorState;
return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), "");
}
if (rc != GSASL_OK) { if (rc != GSASL_OK) {
state_ = kErrorState; state_ = kErrorState;
return std::make_pair( rc_to_status( rc), std::string("")); return std::make_pair( rc_to_status( rc), std::string(""));
} }
init_kerberos(); Status init_status = init_kerberos();
if(!init_status.ok()) {
state_ = kErrorState;
return std::make_pair(init_status, "");
}
state_ = kWaitingForData; state_ = kWaitingForData;
@ -124,12 +155,17 @@ GSaslEngine::Start()
Status GSaslEngine::init_kerberos() { Status GSaslEngine::init_kerberos() {
//TODO: check that we have a principal //TODO: check that we have a principal
try {
gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str()); LockGuard saslGuard(getSaslMutex());
gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str()); // these don't return anything that indicates failure
gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str()); gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
return Status::OK(); gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str());
gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str());
} catch (const LockFailure& e) {
return Status::MutexError("Mutex that guards gsasl_property_set in GSaslEngine::init_kerberos unable to lock");
} }
return Status::OK();
}
std::pair<Status, std::string> GSaslEngine::Step(const std::string data) { std::pair<Status, std::string> GSaslEngine::Step(const std::string data) {
if (state_ != kWaitingForData) if (state_ != kWaitingForData)
@ -137,8 +173,16 @@ std::pair<Status, std::string> GSaslEngine::Step(const std::string data) {
char * output = NULL; char * output = NULL;
size_t outputSize; size_t outputSize;
int rc = gsasl_step(session_, data.c_str(), data.size(), &output,
int rc = 0;
try {
LockGuard saslGuard(getSaslMutex());
rc = gsasl_step(session_, data.c_str(), data.size(), &output,
&outputSize); &outputSize);
} catch (const LockFailure& e) {
state_ = kFailure;
return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), "");
}
if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) { if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
std::string retval(output, output ? outputSize : 0); std::string retval(output, output ? outputSize : 0);
@ -166,16 +210,20 @@ Status GSaslEngine::Finish()
if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState ) if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState )
LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_); LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_);
if (session_ != nullptr) { try {
LockGuard saslGuard(getSaslMutex());
if (session_ != nullptr) {
gsasl_finish(session_); gsasl_finish(session_);
session_ = NULL; session_ = NULL;
} }
if (ctx_ != nullptr) { if (ctx_ != nullptr) {
gsasl_done(ctx_); gsasl_done(ctx_);
ctx_ = nullptr; ctx_ = nullptr;
}
} catch (const LockFailure& e) {
return Status::MutexError("Mutex that guards sasl state cleanup in GSaslEngine::Finish unable to lock");
} }
return Status::OK(); return Status::OK();
} // finish() method } // finish() method

View File

@ -38,8 +38,6 @@ namespace hdfs {
using namespace hadoop::common; using namespace hadoop::common;
using namespace google::protobuf; using namespace google::protobuf;
template <class T>
using optional = std::experimental::optional<T>;
/***** /*****
* Threading model: all entry points need to acquire the sasl_lock before accessing * Threading model: all entry points need to acquire the sasl_lock before accessing

View File

@ -116,6 +116,9 @@ add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc)
target_link_libraries(hdfs_ioservice_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(hdfs_ioservice_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfs_ioservice hdfs_ioservice_test) add_memcheck_test(hdfs_ioservice hdfs_ioservice_test)
add_executable(user_lock_test user_lock_test.cc)
target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(user_lock user_lock_test)
# #
# #

View File

@ -23,7 +23,6 @@ using ::testing::_;
using namespace hdfs; using namespace hdfs;
URI expect_uri_throw(const char *uri) { URI expect_uri_throw(const char *uri) {
bool threw = false; bool threw = false;
std::string what_msg; std::string what_msg;

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <hdfspp/locks.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
using namespace hdfs;
// try_lock will always return false, unlock will always throw because it
// can never be locked.
class CantLockMutex : public Mutex {
public:
void lock() override {
throw LockFailure("This mutex cannot be locked");
}
void unlock() override {
throw LockFailure("Unlock");
}
std::string str() override {
return "CantLockMutex";
}
};
TEST(UserLockTest, DefaultMutexBasics) {
Mutex *mtx = LockManager::TEST_get_default_mutex();
// lock and unlock twice to make sure unlock works
bool locked = false;
try {
mtx->lock();
locked = true;
} catch (...) {}
EXPECT_TRUE(locked);
mtx->unlock();
locked = false;
try {
mtx->lock();
locked = true;
} catch (...) {}
EXPECT_TRUE(locked);
mtx->unlock();
EXPECT_EQ(mtx->str(), "DefaultMutex");
}
// Make sure lock manager can only be initialized once unless test reset called
TEST(UserLockTest, LockManager) {
std::unique_ptr<CantLockMutex> mtx(new CantLockMutex());
EXPECT_TRUE(mtx != nullptr);
// Check the default lock
Mutex *defaultGssapiMtx = LockManager::getGssapiMutex();
EXPECT_TRUE(defaultGssapiMtx != nullptr);
// Try a double init. Should not work
bool res = LockManager::InitLocks(mtx.get());
EXPECT_TRUE(res);
// Check pointer value
EXPECT_EQ(LockManager::getGssapiMutex(), mtx.get());
res = LockManager::InitLocks(mtx.get());
EXPECT_FALSE(res);
// Make sure test reset still works
LockManager::TEST_reset_manager();
res = LockManager::InitLocks(mtx.get());
EXPECT_TRUE(res);
LockManager::TEST_reset_manager();
EXPECT_EQ(LockManager::getGssapiMutex(), defaultGssapiMtx);
}
TEST(UserLockTest, CheckCantLockMutex) {
std::unique_ptr<CantLockMutex> mtx(new CantLockMutex());
EXPECT_TRUE(mtx != nullptr);
bool locked = false;
try {
mtx->lock();
} catch (...) {}
EXPECT_FALSE(locked);
bool threw_on_unlock = false;
try {
mtx->unlock();
} catch (const LockFailure& e) {
threw_on_unlock = true;
}
EXPECT_TRUE(threw_on_unlock);
EXPECT_EQ("CantLockMutex", mtx->str());
}
TEST(UserLockTest, LockGuardBasics) {
Mutex *goodMtx = LockManager::TEST_get_default_mutex();
CantLockMutex badMtx;
// lock/unlock a few times to increase chances of UB if lock is misused
for(int i=0;i<10;i++) {
bool caught_exception = false;
try {
LockGuard guard(goodMtx);
// now have a scoped lock
} catch (const LockFailure& e) {
caught_exception = true;
}
EXPECT_FALSE(caught_exception);
}
// still do a few times, but expect it to blow up each time
for(int i=0;i<10;i++) {
bool caught_exception = false;
try {
LockGuard guard(&badMtx);
// now have a scoped lock
} catch (const LockFailure& e) {
caught_exception = true;
}
EXPECT_TRUE(caught_exception);
}
}
struct Incrementer {
int64_t& _val;
int64_t _iters;
Mutex *_mtx;
Incrementer(int64_t &val, int64_t iters, Mutex *m)
: _val(val), _iters(iters), _mtx(m) {}
void operator()(){
for(int64_t i=0; i<_iters; i++) {
LockGuard valguard(_mtx);
_val += 1;
}
}
};
struct Decrementer {
int64_t& _val;
int64_t _iters;
Mutex *_mtx;
Decrementer(int64_t &val, int64_t iters, Mutex *m)
: _val(val), _iters(iters), _mtx(m) {}
void operator()(){
for(int64_t i=0; i<_iters; i++) {
LockGuard valguard(_mtx);
_val -= 1;
}
}
};
TEST(UserLockTest, LockGuardConcurrency) {
Mutex *mtx = LockManager::TEST_get_default_mutex();
// Prove that these actually mutate the value
int64_t test_value = 0;
Incrementer inc(test_value, 1000, mtx);
inc();
EXPECT_EQ(test_value, 1000);
Decrementer dec(test_value, 1000, mtx);
dec();
EXPECT_EQ(test_value, 0);
std::vector<std::thread> workers;
std::vector<Incrementer> incrementers;
std::vector<Decrementer> decrementors;
const int delta = 1024 * 1024;
const int threads = 2 * 6;
EXPECT_EQ(threads % 2, 0);
// a bunch of threads race to increment and decrement the value
// if all goes well the operations balance out and the value is unchanged
for(int i=0; i < threads; i++) {
if(i%2 == 0) {
incrementers.emplace_back(test_value, delta, mtx);
workers.emplace_back(incrementers.back());
} else {
decrementors.emplace_back(test_value, delta, mtx);
workers.emplace_back(decrementors.back());
}
}
// join, everything should balance to 0
for(std::thread& thread : workers) {
thread.join();
}
EXPECT_EQ(test_value, 0);
}
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.
::testing::InitGoogleMock(&argc, argv);
int res = RUN_ALL_TESTS();
return res;
}