HDFS-11758: libhdfs++: Catch exceptions thrown by runtime hooks. Contributed by James Clampffer.

This commit is contained in:
James Clampffer 2017-05-16 15:29:51 -04:00
parent 57cdad73de
commit c2386bc2f3
11 changed files with 281 additions and 70 deletions

View File

@ -51,24 +51,54 @@ static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
class event_response {
public:
// Create a response
enum event_response_type {
kOk = 0,
// Responses to be used in testing only
kTest_Error = 100
};
// Helper factories
// The default ok response; libhdfspp should continue normally
static event_response ok() { return event_response(); }
event_response_type response() { return response_; }
static event_response make_ok() {
return event_response(kOk);
}
static event_response make_caught_std_exception(const char *what) {
return event_response(kCaughtStdException, what);
}
static event_response make_caught_unknown_exception() {
return event_response(kCaughtUnknownException);
}
// High level classification of responses
enum event_response_type {
kOk = 0,
// User supplied callback threw.
// Std exceptions will copy the what() string
kCaughtStdException = 1,
kCaughtUnknownException = 2,
// Responses to be used in testing only
kTest_Error = 100
};
event_response_type response_type() { return response_type_; }
private:
event_response() : response_(event_response_type::kOk) {};
// Use factories to construct for now
event_response();
event_response(event_response_type type)
: response_type_(type)
{
if(type == kCaughtUnknownException) {
status_ = Status::Exception("c++ unknown exception", "");
}
}
event_response(event_response_type type, const char *what)
: response_type_(type),
exception_msg_(what==nullptr ? "" : what)
{
status_ = Status::Exception("c++ std::exception", exception_msg_.c_str());
}
event_response_type response_;
event_response_type response_type_;
// use to hold what str if event handler threw
std::string exception_msg_;
///////////////////////////////////////////////
@ -83,31 +113,23 @@ public:
return event_response(status);
}
Status status() { return error_status_; }
Status status() { return status_; }
private:
event_response(const Status & status) :
response_(event_response_type::kTest_Error), error_status_(status) {}
response_type_(event_response_type::kTest_Error), status_(status) {}
Status error_status_; // To be used with kTest_Error
Status status_; // To be used with kTest_Error
};
/* callback signature */
typedef std::function<
event_response (const char * event,
const char * cluster,
int64_t value)>
fs_event_callback;
typedef std::function<
event_response (const char * event,
const char * cluster,
const char * file,
int64_t value)>
file_event_callback;
typedef std::function<event_response (const char * event,
const char * cluster,
int64_t value)> fs_event_callback;
typedef std::function<event_response (const char * event,
const char * cluster,
const char * file,
int64_t value)>file_event_callback;
}
#endif

View File

@ -1462,7 +1462,7 @@ event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
int64_t value) {
int result = handler(event, cluster, value, cookie);
if (result == LIBHDFSPP_EVENT_OK) {
return event_response::ok();
return event_response::make_ok();
}
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (result == DEBUG_SIMULATE_ERROR) {
@ -1470,7 +1470,7 @@ event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
}
#endif
return event_response::ok();
return event_response::make_ok();
}
event_response file_callback_glue(libhdfspp_file_event_callback handler,
@ -1481,7 +1481,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler,
int64_t value) {
int result = handler(event, cluster, file, value, cookie);
if (result == LIBHDFSPP_EVENT_OK) {
return event_response::ok();
return event_response::make_ok();
}
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (result == DEBUG_SIMULATE_ERROR) {
@ -1489,7 +1489,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler,
}
#endif
return event_response::ok();
return event_response::make_ok();
}
HDFS_EXT_API

View File

@ -18,6 +18,8 @@
#include "libhdfs_events_impl.h"
#include <exception>
namespace hdfs {
/**
@ -46,6 +48,42 @@ void LibhdfsEvents::clear_file_callback() {
file_callback = std::experimental::nullopt;
}
event_response LibhdfsEvents::call(const char * event,
const char * cluster,
int64_t value)
{
if (fs_callback) {
try {
return fs_callback->operator()(event, cluster, value);
} catch (const std::exception& e) {
return event_response::make_caught_std_exception(e.what());
} catch (...) {
// Arguably calling abort() here would serve as appropriate
// punishment for those who throw garbage that isn't derived
// from std::exception...
return event_response::make_caught_unknown_exception();
}
} else {
return event_response::make_ok();
}
}
event_response LibhdfsEvents::call(const char * event,
const char * cluster,
const char * file,
int64_t value)
{
if (file_callback) {
try {
return file_callback->operator()(event, cluster, file, value);
} catch (const std::exception& e) {
return event_response::make_caught_std_exception(e.what());
} catch (...) {
return event_response::make_caught_unknown_exception();
}
} else {
return event_response::make_ok();
}
}
}

View File

@ -40,27 +40,14 @@ public:
void clear_fs_callback();
void clear_file_callback();
event_response call(const char * event,
const char * cluster,
int64_t value) {
if (fs_callback) {
return fs_callback->operator ()(event, cluster, value);
} else {
return event_response::ok();
}
}
event_response call(const char * event,
const char * cluster,
const char * file,
int64_t value) {
if (file_callback) {
return file_callback->operator ()(event, cluster, file, value);
} else {
return event_response::ok();
}
}
event_response call(const char *event,
const char *cluster,
int64_t value);
event_response call(const char *event,
const char *cluster,
const char *file,
int64_t value);
private:
// Called when fs events occur
std::experimental::optional<fs_event_callback> fs_callback;

View File

@ -256,7 +256,7 @@ void FileHandleImpl::AsyncPreadSome(
auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) {
if (event_resp.response_type() == event_response::kTest_Error) {
handler(event_resp.status(), dn_id, transferred);
return;
}
@ -270,7 +270,7 @@ void FileHandleImpl::AsyncPreadSome(
(void)dn;
event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) {
if (event_resp.response_type() == event_response::kTest_Error) {
status = event_resp.status();
}
#endif

View File

@ -122,7 +122,7 @@ void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name,
if(this->event_handlers_) {
event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (stat.ok() && event_resp.response() == event_response::kTest_Error) {
if (stat.ok() && event_resp.response_type() == event_response::kTest_Error) {
stat = Status::Error("Test error");
}
#endif
@ -182,7 +182,7 @@ struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
@ -255,7 +255,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation
if(parent->event_handlers_) {
event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
@ -309,7 +309,7 @@ struct BlockReaderImpl::ReadData : continuation::Continuation
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
@ -360,7 +360,7 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
@ -414,7 +414,7 @@ struct BlockReaderImpl::AckRead : continuation::Continuation
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif

View File

@ -189,7 +189,7 @@ Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
if(event_handlers_) {
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) {
if (event_resp.response_type() == event_response::kTest_Error) {
status = event_resp.status();
}
#endif

View File

@ -179,7 +179,7 @@ void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, co
if(event_handlers_) {
event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) {
if (event_resp.response_type() == event_response::kTest_Error) {
status = event_resp.status();
}
#endif
@ -370,7 +370,7 @@ void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &origin
if(event_handlers_) {
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) {
if (event_resp.response_type() == event_response::kTest_Error) {
my_ec = std::make_error_code(std::errc::network_down);
}
#endif

View File

@ -189,7 +189,7 @@ TEST(BadDataNodeTest, NNEventCallback) {
if (calls++ == 1)
return event_response::test_err(Status::Error("Test"));
return event_response::ok();
return event_response::make_ok();
});
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors);
Status stat;

View File

@ -18,7 +18,10 @@
#include "hdfspp_mini_dfs.h"
#include "hdfspp/hdfs_ext.h"
#include <cstring>
#include <chrono>
#include <exception>
namespace hdfs {
@ -324,8 +327,8 @@ TEST_F(HdfsExtTest, TestEOF) {
EXPECT_EQ(0, hdfsCloseFile(fs, file));
}
//Testing hdfsExists
TEST_F(HdfsExtTest, TestExists) {
//Testing hdfsExists
TEST_F(HdfsExtTest, TestExists) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
@ -585,8 +588,169 @@ TEST_F(HdfsExtTest, TestWorkingDirectory) {
}
// Flags used to test event handlers
static int connect_callback_invoked = 0;
int basic_fs_callback(const char *event, const char *cluster, int64_t value, int64_t cookie) {
(void)cluster;
(void)value;
if(::strstr(FS_NN_CONNECT_EVENT, event) && cookie == 0xFFF0) {
connect_callback_invoked = 1;
}
return LIBHDFSPP_EVENT_OK;
}
// Make sure event handler gets called during connect
TEST_F(HdfsExtTest, TestConnectEvent) {
connect_callback_invoked = 0;
hdfsPreAttachFSMonitor(basic_fs_callback, 0xFFF0);
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
EXPECT_EQ(connect_callback_invoked, 1);
}
int throwing_fs_callback(const char *event, const char *cluster, int64_t value, int64_t cookie) {
(void)cluster;
(void)value;
if(::strstr(FS_NN_CONNECT_EVENT, event) && cookie == 0xFFF1) {
connect_callback_invoked = 1;
throw std::runtime_error("Throwing in callbacks is a bad thing.");
}
return LIBHDFSPP_EVENT_OK;
}
// Make sure throwing in the connect event handler doesn't prevent connection
TEST_F(HdfsExtTest, TestConnectEventThrow) {
connect_callback_invoked = 0;
hdfsPreAttachFSMonitor(throwing_fs_callback, 0xFFF1);
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
EXPECT_EQ(connect_callback_invoked, 1);
}
int char_throwing_fs_callback(const char *event, const char *cluster, int64_t value, int64_t cookie) {
(void)cluster;
(void)value;
if(::strstr(FS_NN_CONNECT_EVENT, event) && cookie == 0xFFF2) {
connect_callback_invoked = 1;
throw "Throwing non std::exceptions in callbacks is even worse.";
}
return LIBHDFSPP_EVENT_OK;
}
TEST_F(HdfsExtTest, TestConnectEventThrowChar) {
connect_callback_invoked = 0;
hdfsPreAttachFSMonitor(char_throwing_fs_callback, 0xFFF2);
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
EXPECT_EQ(connect_callback_invoked, 1);
}
// Make sure throwing in the read event handler doesn't prevent reads
int read_handler_invokation_count = 0;
int basic_read_event_handler(const char *event, const char *cluster, const char *file,
int64_t value, int64_t cookie)
{
(void)cluster;
(void)file;
(void)value;
if(::strstr(FILE_DN_READ_EVENT, event) && cookie == 0xFFF3) {
read_handler_invokation_count += 1;
}
return LIBHDFSPP_EVENT_OK;
}
// Testing that read handler is called.
// Note: This is counting calls to async_read rather than hdfsPread.
// Typically a call to hdfs(P)Read that doesn't span blocks/packets
// invokes async_read 6 times; 4 more than required (improving that
// in HDFS-11266).
TEST_F(HdfsExtTest, TestReadEvent) {
read_handler_invokation_count = 0;
hdfsPreAttachFileMonitor(basic_read_event_handler, 0xFFF3);
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Write to a file
errno = 0;
int size = 256;
std::string path = "/readEventTest";
hdfsFile file = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0);
EXPECT_NE(nullptr, file);
void * buf = malloc(size);
memset(buf, ' ', size);
EXPECT_EQ(size, hdfsWrite(fs, file, buf, size));
free(buf);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
//Test that read counters are getting incremented
char buffer[300];
file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer)));
EXPECT_EQ(read_handler_invokation_count, 6);
EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer)));
EXPECT_EQ(read_handler_invokation_count, 12);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
}
int throwing_read_event_handler(const char *event, const char *cluster, const char *file,
int64_t value, int64_t cookie)
{
(void)cluster;
(void)file;
(void)value;
if(::strstr(FILE_DN_READ_EVENT, event) && cookie == 0xFFF4) {
read_handler_invokation_count += 1;
throw std::runtime_error("Throwing here is a bad idea, but shouldn't break reads");
}
return LIBHDFSPP_EVENT_OK;
}
// Testing that reads can be done when event handler throws.
TEST_F(HdfsExtTest, TestReadEventThrow) {
read_handler_invokation_count = 0;
hdfsPreAttachFileMonitor(throwing_read_event_handler, 0xFFF4);
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Write to a file
errno = 0;
int size = 256;
std::string path = "/readEventTest";
hdfsFile file = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0);
EXPECT_NE(nullptr, file);
void * buf = malloc(size);
memset(buf, ' ', size);
EXPECT_EQ(size, hdfsWrite(fs, file, buf, size));
free(buf);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
//Test that read counters are getting incremented
char buffer[300];
file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer)));
EXPECT_EQ(read_handler_invokation_count, 6);
EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer)));
EXPECT_EQ(read_handler_invokation_count, 12);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
}
} // end namespace hdfs
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.

View File

@ -367,7 +367,7 @@ TEST(RpcEngineTest, TestEventCallbacks)
if (calls == 1 || calls == 3) // First connect and first read
return event_response::test_err(Status::Error("Test"));
return event_response::ok();
return event_response::make_ok();
});