HDFS-10931: libhdfs++: Fix object lifecycle issues in the BlockReader (see follow up work in jira). Contributed by James Clampffer.

This commit is contained in:
James 2016-10-05 11:55:06 -04:00 committed by James Clampffer
parent 69cb05d292
commit 2a42eeb66f
2 changed files with 125 additions and 90 deletions

View File

@ -231,7 +231,12 @@ static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti)
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n", fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n",
ti->threadIdx); ti->threadIdx);
EXPECT_NONNULL(ti->hdfs); EXPECT_NONNULL(ti->hdfs);
EXPECT_ZERO(doTestHdfsMiniStress(ti, 1)); // Error injection on, some failures are expected in the read path.
// The expectation is that any memory stomps will cascade and cause
// the following test to fail. Ideally RPC errors would be seperated
// from BlockReader errors (RPC is expected to recover from disconnects).
doTestHdfsMiniStress(ti, 1);
// No error injection
EXPECT_ZERO(doTestHdfsMiniStress(ti, 0)); EXPECT_ZERO(doTestHdfsMiniStress(ti, 0));
return 0; return 0;
} }

View File

@ -30,11 +30,12 @@ namespace hdfs {
#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_ #define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_
#define FMT_THIS_ADDR "this=" << (void*)this #define FMT_THIS_ADDR "this=" << (void*)this
hadoop::hdfs::OpReadBlockProto
ReadBlockProto(const std::string &client_name, bool verify_checksum, // Stuff an OpReadBlockProto message with required fields.
const hadoop::common::TokenProto *token, hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, bool verify_checksum, const hadoop::common::TokenProto *token,
uint64_t offset) { const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset)
{
using namespace hadoop::hdfs; using namespace hadoop::hdfs;
using namespace hadoop::common; using namespace hadoop::common;
BaseHeaderProto *base_h = new BaseHeaderProto(); BaseHeaderProto *base_h = new BaseHeaderProto();
@ -55,13 +56,28 @@ ReadBlockProto(const std::string &client_name, bool verify_checksum,
return p; return p;
} }
//
// Notes about the BlockReader and associated object lifecycles (9/29/16)
// -We have a several stages in the read pipeline. Each stage represents a logical
// step in the HDFS block transfer logic. They are implemented as continuations
// for now, and in some cases the stage may have a nested continuation as well.
// It's important to make sure that continuations, nested or otherwise, cannot
// outlive the objects they depend on.
//
// -The BlockReader holds a shared_ptr to the DataNodeConnection that's used in each
// pipeline stage. The connection object must never be destroyed while operations are
// pending on the ASIO side (see HDFS-10931). In order to prevent a state where the
// BlockReader or one of the corresponding pipelines outlives the connection each
// pipeline stage must explicitly hold a shared pointer copied from BlockReaderImpl::dn_.
//
static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock}; static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock};
void BlockReaderImpl::AsyncRequestBlock( void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name,
const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
uint64_t offset, const std::function<void(Status)> &handler) { uint64_t offset, const std::function<void(Status)> &handler)
{
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock("
<< FMT_THIS_ADDR << ", ..., length=" << FMT_THIS_ADDR << ", ..., length="
<< length << ", offset=" << offset << ", ...) called"); << length << ", offset=" << offset << ", ...) called");
@ -95,11 +111,9 @@ void BlockReaderImpl::AsyncRequestBlock(
} }
auto read_pb_message = auto read_pb_message =
new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>( new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(dn_, &s->response);
dn_, &s->response);
m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))) m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))).Push(read_pb_message);
.Push(read_pb_message);
m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
if (stat.ok()) { if (stat.ok()) {
@ -128,10 +142,10 @@ void BlockReaderImpl::AsyncRequestBlock(
}); });
} }
Status BlockReaderImpl::RequestBlock( Status BlockReaderImpl::RequestBlock(const std::string &client_name,
const std::string &client_name, const hadoop::hdfs::ExtendedBlockProto *block,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t length, uint64_t offset)
uint64_t offset) { {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock("
<< FMT_THIS_ADDR <<"..., length=" << FMT_THIS_ADDR <<"..., length="
<< length << ", offset=" << offset << ") called"); << length << ", offset=" << offset << ") called");
@ -143,15 +157,9 @@ Status BlockReaderImpl::RequestBlock(
return future.get(); return future.get();
} }
hadoop::hdfs::OpReadBlockProto struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation
ReadBlockProto(const std::string &client_name, bool verify_checksum, {
const hadoop::common::TokenProto *token, ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
uint64_t offset);
struct BlockReaderImpl::ReadPacketHeader
: continuation::Continuation {
ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {}
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run("
@ -184,8 +192,7 @@ struct BlockReaderImpl::ReadPacketHeader
asio::async_read(*parent_->dn_, asio::buffer(buf_), asio::async_read(*parent_->dn_, asio::buffer(buf_),
std::bind(&ReadPacketHeader::CompletionHandler, this, std::bind(&ReadPacketHeader::CompletionHandler, this,
std::placeholders::_1, std::placeholders::_2), std::placeholders::_1, std::placeholders::_2), handler);
handler);
} }
private: private:
@ -216,10 +223,14 @@ private:
return kHeaderStart + header_length() - transferred; return kHeaderStart + header_length() - transferred;
} }
} }
// Keep the DN connection alive
std::shared_ptr<DataNodeConnection> shared_conn_;
}; };
struct BlockReaderImpl::ReadChecksum : continuation::Continuation { struct BlockReaderImpl::ReadChecksum : continuation::Continuation
ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {} {
ReadChecksum(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run("
@ -231,13 +242,15 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
return; return;
} }
auto handler = [parent, next, this](const asio::error_code &ec, size_t) { std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
auto handler = [parent, next, this, keep_conn_alive_](const asio::error_code &ec, size_t)
{
Status status; Status status;
if (ec) { if (ec) {
status = Status(ec.value(), ec.message().c_str()); status = Status(ec.value(), ec.message().c_str());
} else { } else {
parent->state_ = parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
} }
if(parent->event_handlers_) { if(parent->event_handlers_) {
event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
@ -249,20 +262,25 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
} }
next(status); next(status);
}; };
parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
parent->header_.datalen()); parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen());
asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler); asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler);
} }
private: private:
BlockReaderImpl *parent_; BlockReaderImpl *parent_;
// Keep the DataNodeConnection alive
std::shared_ptr<DataNodeConnection> shared_conn_;
}; };
struct BlockReaderImpl::ReadData : continuation::Continuation { struct BlockReaderImpl::ReadData : continuation::Continuation
ReadData(BlockReaderImpl *parent, {
std::shared_ptr<size_t> bytes_transferred, ReadData(BlockReaderImpl *parent, std::shared_ptr<size_t> bytes_transferred,
const asio::mutable_buffers_1 &buf) const asio::mutable_buffers_1 &buf) : parent_(parent),
: parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) { bytes_transferred_(bytes_transferred), buf_(buf), shared_conn_(parent->dn_)
{
buf_.begin(); buf_.begin();
} }
@ -279,12 +297,15 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
if (ec) { if (ec) {
status = Status(ec.value(), ec.message().c_str()); status = Status(ec.value(), ec.message().c_str());
} }
*bytes_transferred_ += transferred; *bytes_transferred_ += transferred;
parent_->bytes_to_read_ -= transferred; parent_->bytes_to_read_ -= transferred;
parent_->packet_data_read_bytes_ += transferred; parent_->packet_data_read_bytes_ += transferred;
if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
parent_->state_ = kReadPacketHeader; parent_->state_ = kReadPacketHeader;
} }
if(parent_->event_handlers_) { if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
@ -296,24 +317,27 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
next(status); next(status);
}; };
auto data_len = auto data_len = parent_->header_.datalen() - parent_->packet_data_read_bytes_;
parent_->header_.datalen() - parent_->packet_data_read_bytes_;
asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), handler);
handler);
} }
private: private:
BlockReaderImpl *parent_; BlockReaderImpl *parent_;
std::shared_ptr<size_t> bytes_transferred_; std::shared_ptr<size_t> bytes_transferred_;
const asio::mutable_buffers_1 buf_; const asio::mutable_buffers_1 buf_;
// Keep DNConnection alive.
std::shared_ptr<DataNodeConnection> shared_conn_;
}; };
struct BlockReaderImpl::ReadPadding : continuation::Continuation { struct BlockReaderImpl::ReadPadding : continuation::Continuation
ReadPadding(BlockReaderImpl *parent) {
: parent_(parent), padding_(parent->chunk_padding_bytes_), ReadPadding(BlockReaderImpl *parent) : parent_(parent),
padding_(parent->chunk_padding_bytes_),
bytes_transferred_(std::make_shared<size_t>(0)), bytes_transferred_(std::make_shared<size_t>(0)),
read_data_(new ReadData( read_data_(new ReadData(parent, bytes_transferred_, asio::buffer(padding_))),
parent, bytes_transferred_, asio::buffer(padding_))) {} shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run("
@ -324,11 +348,12 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation {
return; return;
} }
auto h = [next, this](const Status &stat) { std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
auto h = [next, this, keep_conn_alive_](const Status &stat) {
Status status = stat; Status status = stat;
if (status.ok()) { if (status.ok()) {
assert(reinterpret_cast<const int &>(*bytes_transferred_) == assert(reinterpret_cast<const int &>(*bytes_transferred_) == parent_->chunk_padding_bytes_);
parent_->chunk_padding_bytes_);
parent_->chunk_padding_bytes_ = 0; parent_->chunk_padding_bytes_ = 0;
parent_->state_ = kReadData; parent_->state_ = kReadData;
} }
@ -352,11 +377,15 @@ private:
std::shared_ptr<continuation::Continuation> read_data_; std::shared_ptr<continuation::Continuation> read_data_;
ReadPadding(const ReadPadding &) = delete; ReadPadding(const ReadPadding &) = delete;
ReadPadding &operator=(const ReadPadding &) = delete; ReadPadding &operator=(const ReadPadding &) = delete;
// Keep DNConnection alive.
std::shared_ptr<DataNodeConnection> shared_conn_;
}; };
struct BlockReaderImpl::AckRead : continuation::Continuation { struct BlockReaderImpl::AckRead : continuation::Continuation
AckRead(BlockReaderImpl *parent) : parent_(parent) {} {
AckRead(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called");
@ -366,17 +395,18 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
return; return;
} }
auto m = auto m = continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
m->state().set_status(parent_->options_.verify_checksum m->state().set_status(parent_->options_.verify_checksum
? hadoop::hdfs::Status::CHECKSUM_OK ? hadoop::hdfs::Status::CHECKSUM_OK
: hadoop::hdfs::Status::SUCCESS); : hadoop::hdfs::Status::SUCCESS);
m->Push( m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
m->Run([this, next](const Status &stat, std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
const hadoop::hdfs::ClientReadStatusProto &) {
m->Run([this, next, keep_conn_alive_](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &)
{
Status status = stat; Status status = stat;
if (status.ok()) { if (status.ok()) {
parent_->state_ = BlockReaderImpl::kFinished; parent_->state_ = BlockReaderImpl::kFinished;
@ -395,11 +425,14 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
private: private:
BlockReaderImpl *parent_; BlockReaderImpl *parent_;
// Keep DNConnection alive.
std::shared_ptr<DataNodeConnection> shared_conn_;
}; };
void BlockReaderImpl::AsyncReadPacket( void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers,
const MutableBuffers &buffers, const std::function<void(const Status &, size_t bytes_transferred)> &handler)
const std::function<void(const Status &, size_t bytes_transferred)> &handler) { {
assert(state_ != kOpen && "Not connected"); assert(state_ != kOpen && "Not connected");
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called"); LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called");
@ -407,9 +440,11 @@ void BlockReaderImpl::AsyncReadPacket(
struct State { struct State {
std::shared_ptr<size_t> bytes_transferred; std::shared_ptr<size_t> bytes_transferred;
}; };
auto m = continuation::Pipeline<State>::Create(cancel_state_); auto m = continuation::Pipeline<State>::Create(cancel_state_);
m->state().bytes_transferred = std::make_shared<size_t>(0); m->state().bytes_transferred = std::make_shared<size_t>(0);
// Note: some of these continuations have nested pipelines.
m->Push(new ReadPacketHeader(this)) m->Push(new ReadPacketHeader(this))
.Push(new ReadChecksum(this)) .Push(new ReadChecksum(this))
.Push(new ReadPadding(this)) .Push(new ReadPadding(this))
@ -424,9 +459,8 @@ void BlockReaderImpl::AsyncReadPacket(
} }
size_t size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status)
BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, {
Status *status) {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
size_t transferred = 0; size_t transferred = 0;
@ -443,12 +477,12 @@ BlockReaderImpl::ReadPacket(const MutableBuffers &buffers,
} }
struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation { struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation
{
RequestBlockContinuation(BlockReader *reader, const std::string &client_name, RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset)
uint64_t length, uint64_t offset) : reader_(reader), client_name_(client_name), length_(length), offset_(offset)
: reader_(reader), client_name_(client_name), length_(length), {
offset_(offset) {
block_.CheckTypeAndMergeFrom(*block); block_.CheckTypeAndMergeFrom(*block);
} }
@ -456,8 +490,7 @@ struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run("
<< FMT_CONT_AND_READER_ADDR << ") called"); << FMT_CONT_AND_READER_ADDR << ") called");
reader_->AsyncRequestBlock(client_name_, &block_, length_, reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next);
offset_, next);
} }
private: private:
@ -468,12 +501,10 @@ private:
uint64_t offset_; uint64_t offset_;
}; };
struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation { struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, {
size_t *transferred) ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred)
: reader_(reader), buffer_(buffer), : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
}
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run("
@ -511,7 +542,8 @@ void BlockReaderImpl::AsyncReadBlock(
const hadoop::hdfs::LocatedBlockProto &block, const hadoop::hdfs::LocatedBlockProto &block,
size_t offset, size_t offset,
const MutableBuffers &buffers, const MutableBuffers &buffers,
const std::function<void(const Status &, size_t)> handler) { const std::function<void(const Status &, size_t)> handler)
{
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
<< FMT_THIS_ADDR << ") called"); << FMT_THIS_ADDR << ") called");
@ -520,12 +552,10 @@ void BlockReaderImpl::AsyncReadBlock(
size_t size = asio::buffer_size(buffers); size_t size = asio::buffer_size(buffers);
m->Push(new RequestBlockContinuation(this, client_name, m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset))
&block.b(), size, offset))
.Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
m->Run([handler] (const Status &status, m->Run([handler] (const Status &status, const size_t totalBytesTransferred) {
const size_t totalBytesTransferred) {
handler(status, totalBytesTransferred); handler(status, totalBytesTransferred);
}); });
} }