diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c index 8d4b74302d..887af8e68b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c @@ -285,6 +285,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti) fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", ti->threadIdx); EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL)); + if (!fs) + return 1; EXPECT_ZERO(setupPaths(ti, &paths)); // test some operations EXPECT_ZERO(doTestHdfsOperations(ti, fs, &paths)); @@ -295,6 +297,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti) EXPECT_ZERO(hdfsDisconnect(fs)); // reconnect to do the final delete. EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL)); + if (!fs) + return 1; EXPECT_ZERO(hdfsDelete(fs, paths.prefix, 1)); EXPECT_ZERO(hdfsDisconnect(fs)); return 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h index dfff20b34a..7dc3f88dea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -115,15 +115,16 @@ class FileSystem { * initializes the RPC connections to the NameNode and returns an * FileSystem object. **/ - static void New( - IoService *io_service, const Options &options, const std::string &server, - const std::string &service, - const std::function &handler); + static FileSystem * New( + IoService *&io_service, const Options &options); - /* Synchronous call of New*/ - static FileSystem * - New(IoService *io_service, const Options &options, const std::string &server, - const std::string &service); + virtual void Connect(const std::string &server, + const std::string &service, + const std::function &&handler) = 0; + + /* Synchronous call of Connect */ + virtual Status Connect(const std::string &server, + const std::string &service) = 0; /** * Open a file on HDFS. The call issues an RPC to the NameNode to @@ -135,6 +136,10 @@ class FileSystem { const std::function &handler) = 0; virtual Status Open(const std::string &path, FileHandle **handle) = 0; + /** + * Note that it is an error to destroy the filesystem from within a filesystem + * callback. It will lead to a deadlock and the termination of the process. + */ virtual ~FileSystem() {}; }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 802b3ea913..d23c7b0d63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -112,8 +112,12 @@ int hdfsFileIsOpenForRead(hdfsFile file) { hdfsFS hdfsConnect(const char *nn, tPort port) { std::string port_as_string = std::to_string(port); IoService * io_service = IoService::New(); - FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string); + FileSystem *fs = FileSystem::New(io_service, Options()); if (!fs) { + return nullptr; + } + + if (!fs->Connect(nn, port_as_string).ok()) { ReportError(ENODEV, "Unable to connect to NameNode."); // FileSystem's ctor might take ownership of the io_service; if it does, @@ -121,6 +125,8 @@ hdfsFS hdfsConnect(const char *nn, tPort port) { if (io_service) delete io_service; + delete fs; + return nullptr; } return new hdfs_internal(fs); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 1808b85fc1..1a1163b394 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -26,6 +26,7 @@ #include #include #include +#include namespace hdfs { @@ -41,7 +42,7 @@ using ::asio::ip::tcp; void NameNodeOperations::Connect(const std::string &server, const std::string &service, - std::function &handler) { + std::function &&handler) { using namespace asio_continuation; typedef std::vector State; auto m = Pipeline::Create(); @@ -106,41 +107,9 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, * FILESYSTEM BASE CLASS ****************************************************************************/ -void FileSystem::New( - IoService *io_service, const Options &options, const std::string &server, - const std::string &service, - const std::function &handler) { - FileSystemImpl *impl = new FileSystemImpl(io_service, options); - impl->Connect(server, service, [impl, handler](const Status &stat) { - if (stat.ok()) { - handler(stat, impl); - } else { - delete impl; - handler(stat, nullptr); - } - }); -} - FileSystem * FileSystem::New( - IoService *io_service, const Options &options, const std::string &server, - const std::string &service) { - auto callstate = std::make_shared>>(); - std::future> future(callstate->get_future()); - - auto callback = [callstate](const Status &s, FileSystem * fs) { - callstate->set_value(std::make_tuple(s, fs)); - }; - - New(io_service, options, server, service, callback); - - /* block until promise is set */ - auto returnstate = future.get(); - - if (std::get<0>(returnstate).ok()) { - return std::get<1>(returnstate); - } else { - return nullptr; - } + IoService *&io_service, const Options &options) { + return new FileSystemImpl(io_service, options); } /***************************************************************************** @@ -175,12 +144,15 @@ FileSystemImpl::~FileSystemImpl() { void FileSystemImpl::Connect(const std::string &server, const std::string &service, - std::function &&handler) { + const std::function &&handler) { /* IoService::New can return nullptr */ if (!io_service_) { - handler (Status::Error("Null IoService")); + handler (Status::Error("Null IoService"), this); } - nn_.Connect(server, service, handler); + + nn_.Connect(server, service, [this, handler](const Status & s) { + handler(s, this); + }); } Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { @@ -188,7 +160,8 @@ Status FileSystemImpl::Connect(const std::string &server, const std::string &ser auto stat = std::make_shared>(); std::future future = stat->get_future(); - auto callback = [stat](const Status &s) { + auto callback = [stat](const Status &s, FileSystem *fs) { + (void)fs; stat->set_value(s); }; @@ -247,4 +220,17 @@ Status FileSystemImpl::Open(const std::string &path, return stat; } +void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { + // It is far too easy to destroy the filesystem (and thus the threadpool) + // from within one of the worker threads, leading to a deadlock. Let's + // provide some explicit protection. + if(t->get_id() == std::this_thread::get_id()) { + //TODO: When we get good logging support, add it in here + std::cerr << "FATAL: Attempted to destroy a thread pool from within a " + "callback of the thread pool.\n"; + } + t->join(); + delete t; +} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 772f93b8f6..d78df81797 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -55,7 +55,7 @@ public: void Connect(const std::string &server, const std::string &service, - std::function &handler); + std::function &&handler); void GetBlockLocations(const std::string & path, std::function)> handler); @@ -85,9 +85,9 @@ public: /* attempt to connect to namenode, return bad status on failure */ void Connect(const std::string &server, const std::string &service, - std::function &&handler); + const std::function &&handler) override; /* attempt to connect to namenode, return bad status on failure */ - Status Connect(const std::string &server, const std::string &service); + Status Connect(const std::string &server, const std::string &service) override; virtual void Open(const std::string &path, @@ -116,10 +116,7 @@ private: std::shared_ptr bad_node_tracker_; struct WorkerDeleter { - void operator()(std::thread *t) { - t->join(); - delete t; - } + void operator()(std::thread *t); }; typedef std::unique_ptr WorkerPtr; std::vector worker_threads_;