From 427edae365ab0ce77f83af942c1c9a3ee0e7efcc Mon Sep 17 00:00:00 2001 From: James Date: Thu, 24 Mar 2016 00:18:59 -0400 Subject: [PATCH] HDFS-9616. libhdfs++: Add runtime hooks to allow a client application to add low level monitoring and tests. Contributed by Bob Hansen --- .../native/libhdfspp/include/hdfspp/events.h | 110 ++++++++++++++++++ .../libhdfspp/include/hdfspp/hdfs_ext.h | 55 +++++++++ .../native/libhdfspp/include/hdfspp/hdfspp.h | 24 ++++ .../native/libhdfspp/lib/bindings/c/hdfs.cc | 76 ++++++++++++ .../libhdfspp/lib/common/CMakeLists.txt | 2 +- .../lib/common/libhdfs_events_impl.cc | 51 ++++++++ .../lib/common/libhdfs_events_impl.h | 73 ++++++++++++ .../lib/connection/datanodeconnection.cc | 3 +- .../lib/connection/datanodeconnection.h | 10 +- .../native/libhdfspp/lib/fs/filehandle.cc | 59 ++++++++-- .../main/native/libhdfspp/lib/fs/filehandle.h | 26 ++++- .../native/libhdfspp/lib/fs/filesystem.cc | 36 ++++-- .../main/native/libhdfspp/lib/fs/filesystem.h | 17 ++- .../libhdfspp/lib/rpc/rpc_connection.cc | 10 ++ .../native/libhdfspp/lib/rpc/rpc_connection.h | 22 +++- .../native/libhdfspp/lib/rpc/rpc_engine.cc | 31 +++-- .../native/libhdfspp/lib/rpc/rpc_engine.h | 21 +++- .../libhdfspp/tests/bad_datanode_test.cc | 73 +++++++++++- .../native/libhdfspp/tests/rpc_engine_test.cc | 74 +++++++++++- 19 files changed, 731 insertions(+), 42 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h new file mode 100644 index 0000000000..82109fd1e6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef HDFSPP_EVENTS +#define HDFSPP_EVENTS + +#include "hdfspp/status.h" + +#include + +namespace hdfs { + +/* + * Supported event names. These names will stay consistent in libhdfs callbacks. + * + * Other events not listed here may be seen, but they are not stable and + * should not be counted on. + */ + +static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect"; +static constexpr const char * FS_NN_READ_EVENT = "NN::read"; +static constexpr const char * FS_NN_WRITE_EVENT = "NN::write"; + +static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect"; +static constexpr const char * FILE_DN_READ_EVENT = "DN::read"; +static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write"; + + + +class event_response { +public: +// Create a response +enum event_response_type { + kOk = 0, + +#ifndef NDEBUG + // Responses to be used in testing only + kTest_Error = 100 +#endif +}; + + + // The default ok response; libhdfspp should continue normally + static event_response ok() { return event_response(); } + event_response_type response() { return response_; } + +private: + event_response() : response_(event_response_type::kOk) {}; + + event_response_type response_; + + + +/////////////////////////////////////////////// +// +// Testing support +// +// If running a debug build, the consumer can stimulate errors +// within libhdfdspp by returning a Status from the callback. +/////////////////////////////////////////////// +#ifndef NDEBUG +public: + static event_response test_err(const Status &status) { + return event_response(status); + } + + Status status() { return error_status_; } + +private: + event_response(const Status & status) : + response_(event_response_type::kTest_Error), error_status_(status) {} + + Status error_status_; // To be used with kTest_Error +#endif +}; + + + +/* callback signature */ +typedef std::function< + event_response (const char * event, + const char * cluster, + int64_t value)> + fs_event_callback; + +typedef std::function< + event_response (const char * event, + const char * cluster, + const char * file, + int64_t value)> + file_event_callback; + + +} +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index 1e73bc59ee..1e1e6aac71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -162,6 +162,61 @@ int hdfsDisableLoggingForComponent(int component); LIBHDFS_EXTERNAL int hdfsSetLoggingLevel(int component); +/* + * Supported event names. These names will stay consistent in libhdfs callbacks. + * + * Other events not listed here may be seen, but they are not stable and + * should not be counted on. + */ +extern const char * FS_NN_CONNECT_EVENT; +extern const char * FS_NN_READ_EVENT; +extern const char * FS_NN_WRITE_EVENT; + +extern const char * FILE_DN_CONNECT_EVENT; +extern const char * FILE_DN_READ_EVENT; +extern const char * FILE_DN_WRITE_EVENT; + + +#define LIBHDFSPP_EVENT_OK (0) +#ifndef NDEBUG + #define DEBUG_SIMULATE_ERROR (-1) +#endif + +typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * cluster, + int64_t value, int64_t cookie); +typedef int (*libhdfspp_file_event_callback)(const char * event, + const char * cluster, + const char * file, + int64_t value, int64_t cookie); + +/** + * Registers a callback for the next filesystem connect operation the current + * thread executes. + * + * @param handler A function pointer. Taken as a void* and internally + * cast into the appropriate type. + * @param cookie An opaque value that will be passed into the handler; can + * be used to correlate the handler with some object in the + * consumer's space. + **/ +LIBHDFS_EXTERNAL +int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie); + + +/** + * Registers a callback for the next file open operation the current thread + * executes. + * + * @param fs The filesystem + * @param handler A function pointer. Taken as a void* and internally + * cast into the appropriate type. + * @param cookie An opaque value that will be passed into the handler; can + * be used to correlate the handler with some object in the + * consumer's space. + **/ +LIBHDFS_EXTERNAL +int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie); + #ifdef __cplusplus } /* end extern "C" */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index b52e832aa4..674dc4a25e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -20,6 +20,7 @@ #include "hdfspp/options.h" #include "hdfspp/status.h" +#include "hdfspp/events.h" #include #include @@ -108,6 +109,18 @@ public: **/ static bool ShouldExclude(const Status &status); + + /** + * Sets an event callback for file-level event notifications (such as connecting + * to the DataNode, communications errors, etc.) + * + * Many events are defined in hdfspp/events.h; the consumer should also expect + * to be called with many private events, which can be ignored. + * + * @param callback The function to call when a reporting event occurs. + */ + virtual void SetFileEventCallback(file_event_callback callback) = 0; + virtual ~FileHandle(); }; @@ -161,6 +174,17 @@ class FileSystem { */ virtual ~FileSystem() {}; + + /** + * Sets an event callback for fs-level event notifications (such as connecting + * to the NameNode, communications errors with the NN, etc.) + * + * Many events are defined in hdfspp/events.h; the consumer should also expect + * to be called with many private events, which can be ignored. + * + * @param callback The function to call when a reporting event occurs. + */ + virtual void SetFsEventCallback(fs_event_callback callback) = 0; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 9ce5c86ec5..cc0d964243 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -30,9 +30,11 @@ #include #include #include +#include using namespace hdfs; using std::experimental::nullopt; +using namespace std::placeholders; static constexpr tPort kDefaultPort = 8020; @@ -81,6 +83,10 @@ void hdfsGetLastError(char *buf, int len) { buf[copylen] = 0; } +/* Event callbacks for next open calls */ +thread_local std::experimental::optional fsEventCallback; +thread_local std::experimental::optional fileEventCallback; + struct hdfsBuilder { hdfsBuilder(); hdfsBuilder(const char * directory); @@ -197,6 +203,10 @@ hdfsFS doHdfsConnect(optional nn, optional port, optionalSetFsEventCallback(fsEventCallback.value()); + } + Status status; if (nn || port) { if (!port) { @@ -399,6 +409,72 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) { } } + +/******************************************************************* + * EVENT CALLBACKS + *******************************************************************/ + +const char * FS_NN_CONNECT_EVENT = hdfs::FS_NN_CONNECT_EVENT; +const char * FS_NN_READ_EVENT = hdfs::FS_NN_READ_EVENT; +const char * FS_NN_WRITE_EVENT = hdfs::FS_NN_WRITE_EVENT; + +const char * FILE_DN_CONNECT_EVENT = hdfs::FILE_DN_CONNECT_EVENT; +const char * FILE_DN_READ_EVENT = hdfs::FILE_DN_READ_EVENT; +const char * FILE_DN_WRITE_EVENT = hdfs::FILE_DN_WRITE_EVENT; + + +event_response fs_callback_glue(libhdfspp_fs_event_callback handler, + int64_t cookie, + const char * event, + const char * cluster, + int64_t value) { + int result = handler(event, cluster, value, cookie); + if (result == LIBHDFSPP_EVENT_OK) { + return event_response::ok(); + } +#ifndef NDEBUG + if (result == DEBUG_SIMULATE_ERROR) { + return event_response::test_err(Status::Error("Simulated error")); + } +#endif + + return event_response::ok(); +} + +event_response file_callback_glue(libhdfspp_file_event_callback handler, + int64_t cookie, + const char * event, + const char * cluster, + const char * file, + int64_t value) { + int result = handler(event, cluster, file, value, cookie); + if (result == LIBHDFSPP_EVENT_OK) { + return event_response::ok(); + } +#ifndef NDEBUG + if (result == DEBUG_SIMULATE_ERROR) { + return event_response::test_err(Status::Error("Simulated error")); + } +#endif + + return event_response::ok(); +} + +int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie) +{ + fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3); + fsEventCallback = callback; + return 0; +} + + +int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie) +{ + file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4); + fileEventCallback = callback; + return 0; +} + /******************************************************************* * BUILDER INTERFACE *******************************************************************/ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 77860b02bd..ea2f952b86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc) +add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc) add_library(common $ $) target_link_libraries(common ${LIB_DL}) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc new file mode 100644 index 0000000000..bcf9cccd0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libhdfs_events_impl.h" + +namespace hdfs { + +/** + * Default no-op callback implementations + **/ + +LibhdfsEvents::LibhdfsEvents() : fs_callback(std::experimental::nullopt), + file_callback(std::experimental::nullopt) +{} + +LibhdfsEvents::~LibhdfsEvents() {} + +void LibhdfsEvents::set_fs_callback(const fs_event_callback & callback) { + fs_callback = callback; +} + +void LibhdfsEvents::set_file_callback(const file_event_callback & callback) { + file_callback = callback; +} + +void LibhdfsEvents::clear_fs_callback() { + fs_callback = std::experimental::nullopt; +} + +void LibhdfsEvents::clear_file_callback() { + file_callback = std::experimental::nullopt; +} + + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h new file mode 100644 index 0000000000..122f7b08c6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL +#define LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL + +#include "hdfspp/events.h" + + +#include +#include + +namespace hdfs { + +/** + * Users can specify event handlers. Default is a no-op handler. + **/ +class LibhdfsEvents { +public: + LibhdfsEvents(); + virtual ~LibhdfsEvents(); + + void set_fs_callback(const fs_event_callback & callback); + void set_file_callback(const file_event_callback & callback); + void clear_fs_callback(); + void clear_file_callback(); + + event_response call(const char * event, + const char * cluster, + int64_t value) { + if (fs_callback) { + return fs_callback->operator ()(event, cluster, value); + } else { + return event_response::ok(); + } + } + + event_response call(const char * event, + const char * cluster, + const char * file, + int64_t value) { + if (file_callback) { + return file_callback->operator ()(event, cluster, file, value); + } else { + return event_response::ok(); + } + } + +private: + // Called when fs events occur + std::experimental::optional fs_callback; + + // Called when file events occur + std::experimental::optional file_callback; +}; + +} +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc index 247c75e3a7..19878ab228 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -26,7 +26,8 @@ DataNodeConnectionImpl::~DataNodeConnectionImpl(){} DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, - const hadoop::common::TokenProto *token) + const hadoop::common::TokenProto *token, + LibhdfsEvents *event_handlers) : event_handlers_(event_handlers) { using namespace ::asio::ip; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h index 8f64110aba..6cb7f4a8a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -21,6 +21,7 @@ #include "common/hdfs_public_api.h" #include "common/async_stream.h" #include "ClientNamenodeProtocol.pb.h" +#include "common/libhdfs_events_impl.h" #include "asio.hpp" @@ -42,10 +43,12 @@ public: std::unique_ptr conn_; std::array endpoints_; std::string uuid_; + LibhdfsEvents *event_handlers_; virtual ~DataNodeConnectionImpl(); DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, - const hadoop::common::TokenProto *token); + const hadoop::common::TokenProto *token, + LibhdfsEvents *event_handlers); void Connect(std::function dn)> handler) override; @@ -54,12 +57,17 @@ public: void async_read_some(const MutableBuffers &buf, std::function handler) override { + event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin()); + conn_->async_read_some(buf, handler); }; void async_write_some(const ConstBuffers &buf, std::function handler) override { + + event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin()); + conn_->async_write_some(buf, handler); } }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index b3954e134a..471281aa5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -21,6 +21,7 @@ #include "common/logging.h" #include "connection/datanodeconnection.h" #include "reader/block_reader.h" +#include "hdfspp/events.h" #include #include @@ -33,13 +34,17 @@ using ::hadoop::hdfs::LocatedBlocksProto; FileHandle::~FileHandle() {} -FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name, +FileHandleImpl::FileHandleImpl(const std::string & cluster_name, + const std::string & path, + ::asio::io_service *io_service, const std::string &client_name, const std::shared_ptr file_info, - std::shared_ptr bad_data_nodes) - : io_service_(io_service), client_name_(client_name), file_info_(file_info), - bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) { + std::shared_ptr bad_data_nodes, + std::shared_ptr event_handlers) + : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), + bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers) { LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" << FMT_THIS_ADDR << ", ...) called"); + } void FileHandleImpl::PositionRead( @@ -228,14 +233,34 @@ void FileHandleImpl::AsyncPreadSome( std::shared_ptr reader; reader = CreateBlockReader(BlockReaderOptions(), dn); + // Lambdas cannot capture copies of member variables so we'll make explicit + // copies for it + auto event_handlers = event_handlers_; + auto path = path_; + auto cluster_name = cluster_name_; + + auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { + auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); +#ifndef NDEBUG + if (event_resp.response() == event_response::kTest_Error) { + handler(event_resp.status(), dn_id, transferred); + return; + } +#endif - auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) { handler(status, dn_id, transferred); }; - dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] + auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] (Status status, std::shared_ptr dn) { (void)dn; + auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); +#ifndef NDEBUG + if (event_resp.response() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + if (status.ok()) { reader->AsyncReadBlock( client_name, *block, offset_within_block, @@ -243,7 +268,9 @@ void FileHandleImpl::AsyncPreadSome( } else { handler(status, dn_id, 0); } - }); + }; + + dn->Connect(connect_handler); return; } @@ -267,7 +294,11 @@ std::shared_ptr FileHandleImpl::CreateDataNodeConnection( const hadoop::common::TokenProto * token) { LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection(" << FMT_THIS_ADDR << ", ...) called"); - return std::make_shared(io_service, dn, token); + return std::make_shared(io_service, dn, token, event_handlers_.get()); +} + +std::shared_ptr FileHandleImpl::get_event_handlers() { + return event_handlers_; } void FileHandleImpl::CancelOperations() { @@ -283,6 +314,18 @@ void FileHandleImpl::CancelOperations() { } } +void FileHandleImpl::SetFileEventCallback(file_event_callback callback) { + std::shared_ptr new_event_handlers; + if (event_handlers_) { + new_event_handlers = std::make_shared(*event_handlers_); + } else { + new_event_handlers = std::make_shared(); + } + new_event_handlers->set_file_callback(callback); + event_handlers_ = new_event_handlers; +} + + bool FileHandle::ShouldExclude(const Status &s) { if (s.ok()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 8c03b37b14..0fb014b8a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -21,6 +21,7 @@ #include "common/hdfs_public_api.h" #include "common/async_stream.h" #include "common/cancel_tracker.h" +#include "common/libhdfs_events_impl.h" #include "reader/fileinfo.h" #include "reader/readergroup.h" @@ -48,9 +49,12 @@ class DataNodeConnection; */ class FileHandleImpl : public FileHandle { public: - FileHandleImpl(::asio::io_service *io_service, const std::string &client_name, + FileHandleImpl(const std::string & cluster_name, + const std::string & path, + ::asio::io_service *io_service, const std::string &client_name, const std::shared_ptr file_info, - std::shared_ptr bad_data_nodes); + std::shared_ptr bad_data_nodes, + std::shared_ptr event_handlers); /* * [Some day reliably] Reads a particular offset into the data file. @@ -58,9 +62,9 @@ public: * success, bytes_read will equal nbyte */ void PositionRead( - void *buf, - size_t nbyte, - uint64_t offset, + void *buf, + size_t nbyte, + uint64_t offset, const std::function &handler ) override; @@ -96,7 +100,6 @@ public: const std::function handler); - /** * Cancels all operations instantiated from this FileHandle. * Will set a flag to abort continuation pipelines when they try to move to the next step. @@ -104,6 +107,14 @@ public: **/ virtual void CancelOperations(void) override; + virtual void SetFileEventCallback(file_event_callback callback) override; + + /** + * Ephemeral objects created by the filehandle will need to get the event + * handler registry owned by the FileSystem. + **/ + std::shared_ptr get_event_handlers(); + protected: virtual std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, std::shared_ptr dn); @@ -112,6 +123,8 @@ protected: const ::hadoop::hdfs::DatanodeInfoProto & dn, const hadoop::common::TokenProto * token); private: + const std::string cluster_name_; + const std::string path_; ::asio::io_service * const io_service_; const std::string client_name_; const std::shared_ptr file_info_; @@ -120,6 +133,7 @@ private: off_t offset_; CancelHandle cancel_state_; ReaderGroup readers_; + std::shared_ptr event_handlers_; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 8f386ed5a2..569b479458 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -47,7 +47,8 @@ static constexpr uint16_t kDefaultPort = 8020; * NAMENODE OPERATIONS ****************************************************************************/ -void NameNodeOperations::Connect(const std::string &server, +void NameNodeOperations::Connect(const std::string &cluster_name, + const std::string &server, const std::string &service, std::function &&handler) { using namespace asio_continuation; @@ -55,8 +56,8 @@ void NameNodeOperations::Connect(const std::string &server, auto m = Pipeline::Create(); m->Push(Resolve(io_service_, server, service, std::back_inserter(m->state()))) - .Push(Bind([this, m](const Continuation::Next &next) { - engine_.Connect(m->state(), next); + .Push(Bind([this, m, cluster_name](const Continuation::Next &next) { + engine_.Connect(cluster_name, m->state(), next); })); m->Run([this, handler](const Status &status, const State &) { handler(status); @@ -113,6 +114,10 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, } +void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) { + engine_.SetFsEventCallback(callback); +} + /***************************************************************************** * FILESYSTEM BASE CLASS ****************************************************************************/ @@ -162,7 +167,8 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n nn_(&io_service_->io_service(), options, GetRandomClientName(), get_effective_user_name(user_name), kNamenodeProtocol, kNamenodeProtocolVersion), client_name_(GetRandomClientName()), - bad_node_tracker_(std::make_shared()) + bad_node_tracker_(std::make_shared()), + event_handlers_(std::make_shared()) { LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl(" << FMT_THIS_ADDR << ") called"); @@ -201,7 +207,9 @@ void FileSystemImpl::Connect(const std::string &server, handler (Status::Error("Null IoService"), this); } - nn_.Connect(server, service, [this, handler](const Status & s) { + cluster_name_ = server + ":" + service; + + nn_.Connect(cluster_name_, server, service, [this, handler](const Status & s) { handler(s, this); }); } @@ -288,8 +296,8 @@ void FileSystemImpl::Open( << FMT_THIS_ADDR << ", path=" << path << ") called"); - nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr file_info) { - handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_) + nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, std::shared_ptr file_info) { + handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) : nullptr); }); } @@ -340,4 +348,18 @@ void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { delete t; } + +void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) { + if (event_handlers_) { + event_handlers_->set_fs_callback(callback); + nn_.SetFsEventCallback(callback); + } +} + + + +std::shared_ptr FileSystemImpl::get_event_handlers() { + return event_handlers_; +} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index b208a6cca6..24854c018d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -19,6 +19,7 @@ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #include "filehandle.h" +#include "common/libhdfs_events_impl.h" #include "common/hdfs_public_api.h" #include "common/async_stream.h" #include "hdfspp/hdfspp.h" @@ -53,13 +54,15 @@ public: engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), namenode_(& engine_) {} - void Connect(const std::string &server, + void Connect(const std::string &cluster_name, + const std::string &server, const std::string &service, std::function &&handler); void GetBlockLocations(const std::string & path, std::function)> handler); + void SetFsEventCallback(fs_event_callback callback); private: ::asio::io_service * io_service_; RpcEngine engine_; @@ -100,6 +103,8 @@ public: Status Open(const std::string &path, FileHandle **handle) override; + void SetFsEventCallback(fs_event_callback callback) override; + /* add a new thread to handle asio requests, return number of threads in pool */ int AddWorkerThread(); @@ -107,9 +112,13 @@ public: /* how many worker threads are servicing asio requests */ int WorkerThreadCount() { return worker_threads_.size(); } + /* all monitored events will need to lookup handlers */ + std::shared_ptr get_event_handlers(); private: const Options options_; + + std::string cluster_name_; /** * The IoService must be the first member variable to ensure that it gets * destroyed last. This allows other members to dequeue things from the @@ -126,6 +135,12 @@ private: typedef std::unique_ptr WorkerPtr; std::vector worker_threads_; + /** + * Runtime event monitoring handlers. + * Note: This is really handy to have for advanced usage but + * exposes implementation details that may change at any time. + **/ + std::shared_ptr event_handlers_; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index c65c063c7f..bed33474f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -321,6 +321,16 @@ void RpcConnection::PreEnqueueRequests( // Don't start sending yet; will flush when connected } +void RpcConnection::SetEventHandlers(std::shared_ptr event_handlers) { + std::lock_guard state_lock(connection_state_lock_); + event_handlers_ = event_handlers; +} + +void RpcConnection::SetClusterName(std::string cluster_name) { + std::lock_guard state_lock(connection_state_lock_); + cluster_name_ = cluster_name; +} + void RpcConnection::CommsError(const Status &status) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 3413438a7f..cab14fa290 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -22,6 +22,7 @@ #include "common/logging.h" #include "common/util.h" +#include "common/libhdfs_events_impl.h" #include #include @@ -111,6 +112,15 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called"); Status status = ToStatus(ec); + if(event_handlers_) { + auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); +#ifndef NDEBUG + if (event_resp.response() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + } + if (status.ok()) { StartReading(); Handshake([shared_this, this](const Status & s) { @@ -241,7 +251,7 @@ void RpcConnectionImpl::FlushPendingRequests() { template -void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ec, +void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &asio_ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; @@ -251,6 +261,16 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ec, std::shared_ptr shared_this = shared_from_this(); + ::asio::error_code ec = asio_ec; + if(event_handlers_) { + auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); +#ifndef NDEBUG + if (event_resp.response() == event_response::kTest_Error) { + ec = std::make_error_code(std::errc::network_down); + } +#endif + } + switch (ec.value()) { case 0: // No errors diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index b598d0fe01..70b50cfa88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -39,18 +39,21 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, protocol_version_(protocol_version), retry_policy_(std::move(MakeRetryPolicy(options))), call_id_(0), - retry_timer(*io_service) { + retry_timer(*io_service), + event_handlers_(std::make_shared()) { LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); - } +} -void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server, +void RpcEngine::Connect(const std::string &cluster_name, + const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) { std::lock_guard state_lock(engine_state_lock_); LOG_DEBUG(kRPC, << "RpcEngine::Connect called"); last_endpoints_ = server; + cluster_name_ = cluster_name; - conn_ = NewConnection(); + conn_ = InitializeConnection(); conn_->Connect(last_endpoints_, handler); } @@ -85,7 +88,7 @@ void RpcEngine::AsyncRpc( LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called"); if (!conn_) { - conn_ = NewConnection(); + conn_ = InitializeConnection(); conn_->ConnectAndFlush(last_endpoints_); } conn_->AsyncRpc(method_name, req, resp, handler); @@ -111,6 +114,14 @@ std::shared_ptr RpcEngine::NewConnection() return std::make_shared>(this); } +std::shared_ptr RpcEngine::InitializeConnection() +{ + std::shared_ptr result = NewConnection(); + result->SetEventHandlers(event_handlers_); + result->SetClusterName(cluster_name_); + return result; +} + Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, std::shared_ptr resp) { @@ -120,7 +131,7 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, { std::lock_guard state_lock(engine_state_lock_); if (!conn_) { - conn_ = NewConnection(); + conn_ = InitializeConnection(); conn_->ConnectAndFlush(last_endpoints_); } conn = conn_; @@ -185,7 +196,7 @@ void RpcEngine::RpcCommsError( // the NN if (!pendingRequests.empty() && head_action && head_action->action != RetryAction::FAIL) { - conn_ = NewConnection(); + conn_ = InitializeConnection(); conn_->PreEnqueueRequests(pendingRequests); if (head_action->delayMillis > 0) { @@ -203,4 +214,10 @@ void RpcEngine::RpcCommsError( } } + +void RpcEngine::SetFsEventCallback(fs_event_callback callback) { + event_handlers_->set_fs_callback(callback); +} + + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 75d4e67ab8..7b66ac03c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -22,6 +22,7 @@ #include "hdfspp/status.h" #include "common/retry_policy.h" +#include "common/libhdfs_events_impl.h" #include #include @@ -131,6 +132,9 @@ class RpcConnection : public std::enable_shared_from_this { // on connect void PreEnqueueRequests(std::vector> requests); + void SetEventHandlers(std::shared_ptr event_handlers); + void SetClusterName(std::string cluster_name); + LockFreeRpcEngine *engine() { return engine_; } ::asio::io_service &io_service(); @@ -186,6 +190,10 @@ class RpcConnection : public std::enable_shared_from_this { // Requests that are waiting for responses typedef std::unordered_map> RequestOnFlyMap; RequestOnFlyMap requests_on_fly_; + std::shared_ptr event_handlers_; + std::string cluster_name_; + + // Lock for mutable parts of this class that need to be thread safe std::mutex connection_state_lock_; }; @@ -234,7 +242,9 @@ class RpcEngine : public LockFreeRpcEngine { const std::string &client_name, const std::string &user_name, const char *protocol_name, int protocol_version); - void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler); + void Connect(const std::string & cluster_name, + const std::vector<::asio::ip::tcp::endpoint> &server, + RpcCallback &handler); void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, @@ -272,13 +282,17 @@ class RpcEngine : public LockFreeRpcEngine { ::asio::io_service &io_service() override { return *io_service_; } const Options &options() const override { return options_; } static std::string GetRandomClientName(); - protected: + + void SetFsEventCallback(fs_event_callback callback); +protected: std::shared_ptr conn_; + std::shared_ptr InitializeConnection(); virtual std::shared_ptr NewConnection(); virtual std::unique_ptr MakeRetryPolicy(const Options &options); // Remember all of the last endpoints in case we need to reconnect and retry std::vector<::asio::ip::tcp::endpoint> last_endpoints_; + private: ::asio::io_service * const io_service_; const Options options_; @@ -287,9 +301,12 @@ private: const std::string protocol_name_; const int protocol_version_; const std::unique_ptr retry_policy_; //null --> no retry + std::string cluster_name_; std::atomic_int call_id_; ::asio::deadline_timer retry_timer; + std::shared_ptr event_handlers_; + std::mutex engine_state_lock_; }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 4741817725..01d723f281 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -18,6 +18,7 @@ #include "fs/filesystem.h" #include "fs/bad_datanode_tracker.h" +#include "common/libhdfs_events_impl.h" #include "common/util.h" @@ -129,9 +130,10 @@ TEST(BadDataNodeTest, TestNoNodes) { }; IoServiceImpl io_service; auto bad_node_tracker = std::make_shared(); + auto monitors = std::make_shared(); bad_node_tracker->AddBadNode("foo"); - PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker); + PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker, monitors); Status stat; size_t read = 0; @@ -147,6 +149,69 @@ TEST(BadDataNodeTest, TestNoNodes) { ASSERT_EQ(0UL, read); } +TEST(BadDataNodeTest, NNEventCallback) { + auto file_info = std::make_shared(); + file_info->blocks_.push_back(LocatedBlockProto()); + LocatedBlockProto & block = file_info->blocks_[0]; + ExtendedBlockProto *b = block.mutable_b(); + b->set_poolid(""); + b->set_blockid(1); + b->set_generationstamp(1); + b->set_numbytes(4096); + + // Set up the one block to have one datanodes holding it + DatanodeInfoProto *di = block.add_locs(); + DatanodeIDProto *dnid = di->mutable_id(); + dnid->set_datanodeuuid("dn1"); + + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + auto tracker = std::make_shared(); + + + // Set up event callbacks + int calls = 0; + std::vector callbacks; + auto monitors = std::make_shared(); + monitors->set_file_callback([&calls, &callbacks] (const char * event, + const char * cluster, + const char * file, + int64_t value) { + (void)cluster; (void) file; (void)value; + callbacks.push_back(event); + + // Allow connect call to succeed by fail on read + if (calls++ == 1) + return event_response::test_err(Status::Error("Test")); + + return event_response::ok(); + }); + PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); + Status stat; + size_t read = 0; + + EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) + // Will return OK, but our callback will subvert it + .WillOnce(InvokeArgument<4>( + Status::OK(), 0)); + + is.AsyncPreadSome( + 0, asio::buffer(buf, sizeof(buf)), nullptr, + [&stat, &read](const Status &status, const std::string &, + size_t transferred) { + stat = status; + read = transferred; + }); + + ASSERT_FALSE(stat.ok()); + ASSERT_EQ(2, callbacks.size()); + ASSERT_EQ(FILE_DN_CONNECT_EVENT, callbacks[0]); + ASSERT_EQ(FILE_DN_READ_EVENT, callbacks[1]); +} + + TEST(BadDataNodeTest, RecoverableError) { auto file_info = std::make_shared(); file_info->blocks_.push_back(LocatedBlockProto()); @@ -167,7 +232,8 @@ TEST(BadDataNodeTest, RecoverableError) { }; IoServiceImpl io_service; auto tracker = std::make_shared(); - PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker); + auto monitors = std::make_shared(); + PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); Status stat; size_t read = 0; EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) @@ -216,7 +282,8 @@ TEST(BadDataNodeTest, InternalError) { }; IoServiceImpl io_service; auto tracker = std::make_shared(); - PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker); + auto monitors = std::make_shared(); + PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); Status stat; size_t read = 0; EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index de9972e1b8..b7d5d0bb21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -266,7 +266,7 @@ TEST(RpcEngineTest, TestConnectionFailure) EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); - engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_FALSE(stat.ok()); @@ -294,7 +294,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); - engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_FALSE(stat.ok()); @@ -322,7 +322,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover) .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); - engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_TRUE(stat.ok()); @@ -331,6 +331,72 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover) ASSERT_TRUE(complete); } +TEST(RpcEngineTest, TestEventCallbacks) +{ + ::asio::io_service io_service; + Options options; + options.max_rpc_retries = 99; + options.rpc_retry_delay_ms = 0; + SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1); + + // Set up event callbacks + int calls = 0; + std::vector callbacks; + engine.SetFsEventCallback([&calls, &callbacks] (const char * event, + const char * cluster, + int64_t value) { + (void)cluster; (void)value; + callbacks.push_back(event); + + // Allow connect and fail first read + calls++; + if (calls == 1 || calls == 3) // First connect and first read + return event_response::test_err(Status::Error("Test")); + + return event_response::ok(); + }); + + + + EchoResponseProto server_resp; + server_resp.set_message("foo"); + + auto producer = std::make_shared(); + producer->checkProducerForConnect = true; + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) // subverted by callback + .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) + .WillOnce(Return(RpcResponse(h, "b"))) // subverted by callback + .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); + SharedMockConnection::SetSharedConnectionData(producer); + + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr resp(new EchoResponseProto()); + + bool complete = false; + engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_TRUE(stat.ok()); + }); + io_service.run(); + ASSERT_TRUE(complete); + ASSERT_EQ(7, callbacks.size()); + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect + ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect + for (int i=4; i < 7; i++) + ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]); +} + + + TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) { // Error and async recover @@ -351,7 +417,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); - engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) { + engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { complete = true; io_service.stop(); ASSERT_TRUE(stat.ok());