HDFS-10796: libhdfs++: rationalize ioservice interactions. Contributed by James Clampffer.

This commit is contained in:
James 2016-12-09 18:06:06 -05:00 committed by James Clampffer
parent dd7837c429
commit fbff671e0f
11 changed files with 291 additions and 73 deletions

View File

@ -39,24 +39,47 @@ namespace hdfs {
* When an operation is queued into an IoService, the IoService will
* run the callback handler associated with the operation. Note that
* the IoService must be stopped before destructing the objects that
* file the operations.
* post the operations.
*
* From an implementation point of view the IoService object wraps the
* ::asio::io_service objects. Please see the related documentation
* for more details.
* From an implementation point of view the hdfs::IoService provides
* a thin wrapper over an asio::io_service object so that additional
* instrumentation and functionality can be added.
**/
class IoService {
class IoService : public std::enable_shared_from_this<IoService>
{
public:
static IoService *New();
static std::shared_ptr<IoService> MakeShared();
virtual ~IoService();
/**
* Start up as many threads as there are logical processors.
* Return number of threads created.
**/
virtual unsigned int InitDefaultWorkers() = 0;
/**
* Initialize with thread_count handler threads.
* If thread count is less than one print a log message and default to one thread.
* Return number of threads created.
**/
virtual unsigned int InitWorkers(unsigned int thread_count) = 0;
/**
* Place an item on the execution queue. Will be invoked from outside of the calling context.
**/
virtual void PostTask(std::function<void(void)>& asyncTask) = 0;
/**
* Run the asynchronous tasks associated with this IoService.
**/
virtual void Run() = 0;
/**
* Stop running asynchronous tasks associated with this IoService.
* All worker threads will return as soon as they finish executing their current task.
**/
virtual void Stop() = 0;
virtual ~IoService();
};
/**
@ -163,6 +186,13 @@ class FileSystem {
static FileSystem *New(
IoService *&io_service, const std::string &user_name, const Options &options);
/**
* Works the same as the other FileSystem::New but takes a copy of an existing IoService.
* The shared IoService is expected to already have worker threads initialized.
**/
static FileSystem *New(
std::shared_ptr<IoService>, const std::string &user_name, const Options &options);
/**
* Returns a new instance with default user and option, with the default IOService.
**/

View File

@ -123,6 +123,13 @@ struct Options {
long block_size;
static const long kDefaultBlockSize = 128*1024*1024;
/**
* Asio worker thread count
* default: -1, indicates number of hardware threads
**/
int io_threads_;
static const int kDefaultIoThreads = -1;
Options();
};
}

View File

@ -18,14 +18,105 @@
#include "hdfs_ioservice.h"
#include <thread>
#include <mutex>
#include <vector>
#include "common/logging.h"
namespace hdfs {
IoService::~IoService() {}
IoService *IoService::New() { return new IoServiceImpl(); }
IoService *IoService::New() {
return new IoServiceImpl();
}
std::shared_ptr<IoService> IoService::MakeShared() {
return std::make_shared<IoServiceImpl>();
}
unsigned int IoServiceImpl::InitDefaultWorkers() {
LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << " called.");
unsigned int logical_thread_count = std::thread::hardware_concurrency();
#ifndef DISABLE_CONCURRENT_WORKERS
if(logical_thread_count < 1) {
LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not detect any logical processors. Defaulting to 1 worker thread.");
} else {
LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << logical_thread_count << " logical threads and will spawn a worker for each.");
}
#else
if(logical_thread_count > 0) {
LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << logical_thread_count << " threads available. Concurrent workers are disabled so 1 worker thread will be used");
}
logical_thread_count = 1;
#endif
return InitWorkers(logical_thread_count);
}
unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) {
#ifdef DISABLED_CONCURRENT_WORKERS
LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count << " threads specified but concurrent workers are disabled so 1 will be used");
thread_count = 1;
#endif
unsigned int created_threads = 0;
for(unsigned int i=0; i<thread_count; i++) {
bool created = AddWorkerThread();
if(created) {
created_threads++;
} else {
LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers failed to create a worker thread");
}
}
if(created_threads != thread_count) {
LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers attempted to create "
<< thread_count << " but only created " << created_threads
<< " worker threads. Make sure this process has adequate resources.");
}
return created_threads;
}
bool IoServiceImpl::AddWorkerThread() {
mutex_guard state_lock(state_lock_);
auto async_worker = [this]() {
this->ThreadStartHook();
this->Run();
this->ThreadExitHook();
};
worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) );
return true;
}
void IoServiceImpl::ThreadStartHook() {
mutex_guard state_lock(state_lock_);
LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " starting");
}
void IoServiceImpl::ThreadExitHook() {
mutex_guard state_lock(state_lock_);
LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting");
}
void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) {
io_service_.post(asyncTask);
}
void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) {
// It is far too easy to destroy the filesystem (and thus the threadpool)
// from within one of the worker threads, leading to a deadlock. Let's
// provide some explicit protection.
if(t->get_id() == std::this_thread::get_id()) {
LOG_ERROR(kAsyncRuntime, << "FileSystemImpl::WorkerDeleter::operator(treadptr="
<< t << ") : FATAL: Attempted to destroy a thread pool"
"from within a callback of the thread pool!");
}
t->join();
delete t;
}
// As long as this just forwards to an asio::io_service method it doesn't need a lock
void IoServiceImpl::Run() {
// The IoService executes callbacks provided by library users in the context of worker threads,
// there is no way of preventing those callbacks from throwing but we can at least prevent them
@ -33,7 +124,7 @@ void IoServiceImpl::Run() {
// As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
asio::io_service::work work(io_service_);
for(;;)
while(true)
{
try
{
@ -47,5 +138,9 @@ void IoServiceImpl::Run() {
}
}
unsigned int IoServiceImpl::get_worker_thread_count() {
mutex_guard state_lock(state_lock_);
return worker_threads_.size();
}
}

View File

@ -22,23 +22,56 @@
#include "hdfspp/hdfspp.h"
#include <asio/io_service.hpp>
#include "common/util.h"
#include <mutex>
#include <thread>
namespace hdfs {
// Uncomment this to determine if issues are due to concurrency or logic faults
// If tests still fail with concurrency disabled it's most likely a logic bug
#define DISABLE_CONCURRENT_WORKERS
/*
* A thin wrapper over the asio::io_service.
* -In the future this could own the worker threads that execute io tasks which
* makes it easier to share IoServices between FileSystems. See HDFS-10796 for
* rationale.
* A thin wrapper over the asio::io_service with a few extras
* -manages it's own worker threads
* -some helpers for sharing with multiple modules that need to do async work
*/
class IoServiceImpl : public IoService {
public:
IoServiceImpl() {}
virtual unsigned int InitDefaultWorkers() override;
virtual unsigned int InitWorkers(unsigned int thread_count) override;
virtual void PostTask(std::function<void(void)>& asyncTask) override;
virtual void Run() override;
virtual void Stop() override { io_service_.stop(); }
// Add a single worker thread, in the common case try to avoid this in favor
// of Init[Default]Workers. Public for use by tests and rare cases where a
// client wants very explicit control of threading for performance reasons
// e.g. pinning threads to NUMA nodes.
bool AddWorkerThread();
// Be very careful about using this: HDFS-10241
::asio::io_service &io_service() { return io_service_; }
unsigned int get_worker_thread_count();
private:
std::mutex state_lock_;
::asio::io_service io_service_;
// For doing logging + resource manager updates on thread start/exit
void ThreadStartHook();
void ThreadExitHook();
// Support for async worker threads
struct WorkerDeleter {
void operator()(std::thread *t);
};
typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
std::vector<WorkerPtr> worker_threads_;
};
}

View File

@ -183,6 +183,11 @@ LogMessage& LogMessage::operator<<(void *ptr) {
}
LogMessage& LogMessage::operator<<(const std::thread::id& tid) {
msg_buffer_ << tid;
return *this;
}
std::string LogMessage::MsgString() const {
return msg_buffer_.str();
}
@ -199,12 +204,13 @@ const char * LogMessage::level_string() const {
return kLevelStrings[level_];
}
const char * kComponentStrings[5] = {
"[Unknown ]",
"[RPC ]",
"[BlockReader ]",
"[FileHandle ]",
"[FileSystem ]"
const char * kComponentStrings[6] = {
"[Unknown ]",
"[RPC ]",
"[BlockReader ]",
"[FileHandle ]",
"[FileSystem ]",
"[Async Runtime ]",
};
const char * LogMessage::component_string() const {
@ -213,6 +219,7 @@ const char * LogMessage::component_string() const {
case kBlockReader: return kComponentStrings[2];
case kFileHandle: return kComponentStrings[3];
case kFileSystem: return kComponentStrings[4];
case kAsyncRuntime: return kComponentStrings[5];
default: return kComponentStrings[0];
}
}

View File

@ -27,6 +27,7 @@
#include <sstream>
#include <mutex>
#include <memory>
#include <thread>
#include <asio/ip/tcp.hpp>
@ -49,11 +50,12 @@ enum LogLevel {
};
enum LogSourceComponent {
kUnknown = 1 << 0,
kRPC = 1 << 1,
kBlockReader = 1 << 2,
kFileHandle = 1 << 3,
kFileSystem = 1 << 4,
kUnknown = 1 << 0,
kRPC = 1 << 1,
kBlockReader = 1 << 2,
kFileHandle = 1 << 3,
kFileSystem = 1 << 4,
kAsyncRuntime = 1 << 5,
};
#define LOG_TRACE(C, MSG) do { \
@ -196,6 +198,9 @@ class LogMessage {
//asio types
LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint);
//thread and mutex types
LogMessage& operator<<(const std::thread::id& tid);
std::string MsgString() const;

View File

@ -39,7 +39,11 @@ Options::Options() : rpc_timeout(kDefaultRpcTimeout),
failover_max_retries(kDefaultFailoverMaxRetries),
failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries),
authentication(kDefaultAuthentication),
block_size(kDefaultBlockSize) {}
block_size(kDefaultBlockSize),
io_threads_(kDefaultIoThreads)
{
}
std::string NamenodeInfo::get_host() const {
return uri.get_host();

View File

@ -79,11 +79,20 @@ FileSystem *FileSystem::New(
return new FileSystemImpl(io_service, user_name, options);
}
FileSystem *FileSystem::New(
std::shared_ptr<IoService> io_service, const std::string &user_name, const Options &options) {
return new FileSystemImpl(io_service, user_name, options);
}
FileSystem *FileSystem::New() {
// No, this pointer won't be leaked. The FileSystem takes ownership.
IoService *io_service = IoService::New();
std::shared_ptr<IoService> io_service = IoService::MakeShared();
if(!io_service)
return nullptr;
int thread_count = io_service->InitDefaultWorkers();
if(thread_count < 1)
return nullptr;
std::string user_name = get_effective_user_name("");
Options options;
return new FileSystemImpl(io_service, user_name, options);
@ -123,25 +132,56 @@ const std::string get_effective_user_name(const std::string &user_name) {
}
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) :
options_(options), client_name_(GetRandomClientName()), io_service_(
static_cast<IoServiceImpl *>(io_service)),
nn_(
&io_service_->io_service(), options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion
), bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>()) {
io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options),
client_name_(GetRandomClientName()),
nn_(
&io_service_->io_service(), options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion
),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>())
{
LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
// Poor man's move
io_service = nullptr;
/* spawn background threads for asio delegation */
unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
for (unsigned int i = 0; i < threads; i++) {
AddWorkerThread();
unsigned int running_workers = 0;
if(options.io_threads_ < 1) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing default number of worker threads");
running_workers = io_service_->InitDefaultWorkers();
} else {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " << options_.io_threads_ << " worker threads.");
running_workers = io_service->InitWorkers(options_.io_threads_);
}
if(running_workers < 1) {
LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to start worker threads");
}
}
FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) :
io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), options_(options),
client_name_(GetRandomClientName()),
nn_(
&io_service_->io_service(), options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion
),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>())
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called");
int worker_thread_count = io_service_->get_worker_thread_count();
if(worker_thread_count < 1) {
LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. "
<< "It needs at least 1 worker to connect to an HDFS cluster.")
} else {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << worker_thread_count << " worker threads.");
}
}
@ -154,7 +194,6 @@ FileSystemImpl::~FileSystemImpl() {
* Once worker threads are joined and deleted the service can be deleted.
**/
io_service_->Stop();
worker_threads_.clear();
}
void FileSystemImpl::Connect(const std::string &server,
@ -230,12 +269,21 @@ void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &,
int FileSystemImpl::AddWorkerThread() {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
<< FMT_THIS_ADDR << ") called."
<< " Existing thread count = " << worker_threads_.size());
<< " Existing thread count = " << WorkerThreadCount());
auto service_task = [](IoService *service) { service->Run(); };
worker_threads_.push_back(
WorkerPtr(new std::thread(service_task, io_service_.get())));
return worker_threads_.size();
if(!io_service_)
return -1;
io_service_->AddWorkerThread();
return 1;
}
int FileSystemImpl::WorkerThreadCount() {
if(!io_service_) {
return -1;
} else {
return io_service_->get_worker_thread_count();
}
}
void FileSystemImpl::Open(
@ -714,21 +762,6 @@ void FileSystemImpl::DisallowSnapshot(const std::string &path,
nn_.DisallowSnapshot(path, handler);
}
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
// It is far too easy to destroy the filesystem (and thus the threadpool)
// from within one of the worker threads, leading to a deadlock. Let's
// provide some explicit protection.
if(t->get_id() == std::this_thread::get_id()) {
LOG_ERROR(kFileSystem, << "FileSystemImpl::WorkerDeleter::operator(treadptr="
<< t << ") : FATAL: Attempted to destroy a thread pool"
"from within a callback of the thread pool!");
}
t->join();
delete t;
}
void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
if (event_handlers_) {
event_handlers_->set_fs_callback(callback);

View File

@ -46,7 +46,8 @@ namespace hdfs {
class FileSystemImpl : public FileSystem {
public:
MEMCHECKED_CLASS(FileSystemImpl)
FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options);
~FileSystemImpl() override;
/* attempt to connect to namenode, return bad status on failure */
@ -176,7 +177,7 @@ public:
int AddWorkerThread();
/* how many worker threads are servicing asio requests */
int WorkerThreadCount() { return worker_threads_.size(); }
int WorkerThreadCount();
/* all monitored events will need to lookup handlers */
std::shared_ptr<LibhdfsEvents> get_event_handlers();
@ -184,24 +185,18 @@ public:
Options get_options();
private:
const Options options_;
const std::string client_name_;
std::string cluster_name_;
/**
* The IoService must be the first member variable to ensure that it gets
* destroyed last. This allows other members to dequeue things from the
* service in their own destructors.
**/
std::unique_ptr<IoServiceImpl> io_service_;
std::shared_ptr<IoServiceImpl> io_service_;
const Options options_;
const std::string client_name_;
std::string cluster_name_;
NameNodeOperations nn_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
struct WorkerDeleter {
void operator()(std::thread *t);
};
typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
std::vector<WorkerPtr> worker_threads_;
/**
* Runtime event monitoring handlers.
* Note: This is really handy to have for advanced usage but

View File

@ -112,6 +112,11 @@ add_executable(logging_test logging_test.cc)
target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(logging_test logging_test)
add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc)
target_link_libraries(hdfs_ioservice_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfs_ioservice hdfs_ioservice_test)
#
#
# INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS

View File

@ -113,7 +113,7 @@ private:
class MiniCluster {
public:
MiniCluster() : io_service(IoService::New()) {
MiniCluster() : io_service(IoService::MakeShared()) {
struct NativeMiniDfsConf conf = {
1, /* doFormat */
0, /* webhdfs */
@ -137,6 +137,10 @@ public:
// Connect via the C++ API
FSHandle connect(const std::string username) {
Options options;
unsigned int worker_count = io_service->InitDefaultWorkers();
EXPECT_NE(0, worker_count);
FileSystem * fs = FileSystem::New(io_service, username, options);
EXPECT_NE(nullptr, fs);
FSHandle result(fs);
@ -184,7 +188,7 @@ public:
protected:
struct NativeMiniDfsCluster* clusterInfo;
IoService * io_service;
std::shared_ptr<IoService> io_service;
};
} // namespace