diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c index 0d01e447c3..dca4782d9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c @@ -231,7 +231,12 @@ static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti) fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n", ti->threadIdx); EXPECT_NONNULL(ti->hdfs); - EXPECT_ZERO(doTestHdfsMiniStress(ti, 1)); + // Error injection on, some failures are expected in the read path. + // The expectation is that any memory stomps will cascade and cause + // the following test to fail. Ideally RPC errors would be seperated + // from BlockReader errors (RPC is expected to recover from disconnects). + doTestHdfsMiniStress(ti, 1); + // No error injection EXPECT_ZERO(doTestHdfsMiniStress(ti, 0)); return 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index 6098b9cde1..b63ef76f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -30,11 +30,12 @@ namespace hdfs { #define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_ #define FMT_THIS_ADDR "this=" << (void*)this -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { + +// Stuff an OpReadBlockProto message with required fields. +hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name, + bool verify_checksum, const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) +{ using namespace hadoop::hdfs; using namespace hadoop::common; BaseHeaderProto *base_h = new BaseHeaderProto(); @@ -55,13 +56,28 @@ ReadBlockProto(const std::string &client_name, bool verify_checksum, return p; } +// +// Notes about the BlockReader and associated object lifecycles (9/29/16) +// -We have a several stages in the read pipeline. Each stage represents a logical +// step in the HDFS block transfer logic. They are implemented as continuations +// for now, and in some cases the stage may have a nested continuation as well. +// It's important to make sure that continuations, nested or otherwise, cannot +// outlive the objects they depend on. +// +// -The BlockReader holds a shared_ptr to the DataNodeConnection that's used in each +// pipeline stage. The connection object must never be destroyed while operations are +// pending on the ASIO side (see HDFS-10931). In order to prevent a state where the +// BlockReader or one of the corresponding pipelines outlives the connection each +// pipeline stage must explicitly hold a shared pointer copied from BlockReaderImpl::dn_. +// + static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock}; -void BlockReaderImpl::AsyncRequestBlock( - const std::string &client_name, +void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset, const std::function &handler) { + uint64_t offset, const std::function &handler) +{ LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock(" << FMT_THIS_ADDR << ", ..., length=" << length << ", offset=" << offset << ", ...) called"); @@ -95,24 +111,22 @@ void BlockReaderImpl::AsyncRequestBlock( } auto read_pb_message = - new continuation::ReadDelimitedPBMessageContinuation( - dn_, &s->response); + new continuation::ReadDelimitedPBMessageContinuation(dn_, &s->response); - m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))) - .Push(read_pb_message); + m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))).Push(read_pb_message); m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; if (stat.ok()) { const auto &resp = s.response; - if(this->event_handlers_) { - event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); + if(this->event_handlers_) { + event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (stat.ok() && event_resp.response() == event_response::kTest_Error) { - stat = Status::Error("Test error"); - } + if (stat.ok() && event_resp.response() == event_response::kTest_Error) { + stat = Status::Error("Test error"); + } #endif - } + } if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) { if (resp.has_readopchecksuminfo()) { @@ -128,10 +142,10 @@ void BlockReaderImpl::AsyncRequestBlock( }); } -Status BlockReaderImpl::RequestBlock( - const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { +Status BlockReaderImpl::RequestBlock(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) +{ LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock(" << FMT_THIS_ADDR <<"..., length=" << length << ", offset=" << offset << ") called"); @@ -143,15 +157,9 @@ Status BlockReaderImpl::RequestBlock( return future.get(); } -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); - -struct BlockReaderImpl::ReadPacketHeader - : continuation::Continuation { - ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {} +struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation +{ + ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run(" @@ -184,8 +192,7 @@ struct BlockReaderImpl::ReadPacketHeader asio::async_read(*parent_->dn_, asio::buffer(buf_), std::bind(&ReadPacketHeader::CompletionHandler, this, - std::placeholders::_1, std::placeholders::_2), - handler); + std::placeholders::_1, std::placeholders::_2), handler); } private: @@ -216,10 +223,14 @@ private: return kHeaderStart + header_length() - transferred; } } + + // Keep the DN connection alive + std::shared_ptr shared_conn_; }; -struct BlockReaderImpl::ReadChecksum : continuation::Continuation { - ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {} +struct BlockReaderImpl::ReadChecksum : continuation::Continuation +{ + ReadChecksum(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run(" @@ -231,13 +242,15 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { return; } - auto handler = [parent, next, this](const asio::error_code &ec, size_t) { + std::shared_ptr keep_conn_alive_ = shared_conn_; + + auto handler = [parent, next, this, keep_conn_alive_](const asio::error_code &ec, size_t) + { Status status; if (ec) { status = Status(ec.value(), ec.message().c_str()); } else { - parent->state_ = - parent->chunk_padding_bytes_ ? kReadPadding : kReadData; + parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData; } if(parent->event_handlers_) { event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); @@ -249,20 +262,25 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation { } next(status); }; - parent->checksum_.resize(parent->packet_len_ - sizeof(int) - - parent->header_.datalen()); + + parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen()); + asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler); } private: BlockReaderImpl *parent_; + + // Keep the DataNodeConnection alive + std::shared_ptr shared_conn_; }; -struct BlockReaderImpl::ReadData : continuation::Continuation { - ReadData(BlockReaderImpl *parent, - std::shared_ptr bytes_transferred, - const asio::mutable_buffers_1 &buf) - : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) { +struct BlockReaderImpl::ReadData : continuation::Continuation +{ + ReadData(BlockReaderImpl *parent, std::shared_ptr bytes_transferred, + const asio::mutable_buffers_1 &buf) : parent_(parent), + bytes_transferred_(bytes_transferred), buf_(buf), shared_conn_(parent->dn_) + { buf_.begin(); } @@ -279,12 +297,15 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { if (ec) { status = Status(ec.value(), ec.message().c_str()); } + *bytes_transferred_ += transferred; parent_->bytes_to_read_ -= transferred; parent_->packet_data_read_bytes_ += transferred; + if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { parent_->state_ = kReadPacketHeader; } + if(parent_->event_handlers_) { event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED @@ -296,24 +317,27 @@ struct BlockReaderImpl::ReadData : continuation::Continuation { next(status); }; - auto data_len = - parent_->header_.datalen() - parent_->packet_data_read_bytes_; - asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), - handler); + auto data_len = parent_->header_.datalen() - parent_->packet_data_read_bytes_; + + asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), handler); } private: BlockReaderImpl *parent_; std::shared_ptr bytes_transferred_; const asio::mutable_buffers_1 buf_; + + // Keep DNConnection alive. + std::shared_ptr shared_conn_; }; -struct BlockReaderImpl::ReadPadding : continuation::Continuation { - ReadPadding(BlockReaderImpl *parent) - : parent_(parent), padding_(parent->chunk_padding_bytes_), +struct BlockReaderImpl::ReadPadding : continuation::Continuation +{ + ReadPadding(BlockReaderImpl *parent) : parent_(parent), + padding_(parent->chunk_padding_bytes_), bytes_transferred_(std::make_shared(0)), - read_data_(new ReadData( - parent, bytes_transferred_, asio::buffer(padding_))) {} + read_data_(new ReadData(parent, bytes_transferred_, asio::buffer(padding_))), + shared_conn_(parent->dn_) {} virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run(" @@ -324,11 +348,12 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation { return; } - auto h = [next, this](const Status &stat) { + std::shared_ptr keep_conn_alive_ = shared_conn_; + + auto h = [next, this, keep_conn_alive_](const Status &stat) { Status status = stat; if (status.ok()) { - assert(reinterpret_cast(*bytes_transferred_) == - parent_->chunk_padding_bytes_); + assert(reinterpret_cast(*bytes_transferred_) == parent_->chunk_padding_bytes_); parent_->chunk_padding_bytes_ = 0; parent_->state_ = kReadData; } @@ -352,11 +377,15 @@ private: std::shared_ptr read_data_; ReadPadding(const ReadPadding &) = delete; ReadPadding &operator=(const ReadPadding &) = delete; + + // Keep DNConnection alive. + std::shared_ptr shared_conn_; }; -struct BlockReaderImpl::AckRead : continuation::Continuation { - AckRead(BlockReaderImpl *parent) : parent_(parent) {} +struct BlockReaderImpl::AckRead : continuation::Continuation +{ + AckRead(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); @@ -366,17 +395,18 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { return; } - auto m = - continuation::Pipeline::Create(parent_->cancel_state_); + auto m = continuation::Pipeline::Create(parent_->cancel_state_); + m->state().set_status(parent_->options_.verify_checksum ? hadoop::hdfs::Status::CHECKSUM_OK : hadoop::hdfs::Status::SUCCESS); - m->Push( - continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); + m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); - m->Run([this, next](const Status &stat, - const hadoop::hdfs::ClientReadStatusProto &) { + std::shared_ptr keep_conn_alive_ = shared_conn_; + + m->Run([this, next, keep_conn_alive_](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &) + { Status status = stat; if (status.ok()) { parent_->state_ = BlockReaderImpl::kFinished; @@ -395,11 +425,14 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { private: BlockReaderImpl *parent_; + + // Keep DNConnection alive. + std::shared_ptr shared_conn_; }; -void BlockReaderImpl::AsyncReadPacket( - const MutableBuffers &buffers, - const std::function &handler) { +void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, + const std::function &handler) +{ assert(state_ != kOpen && "Not connected"); LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called"); @@ -407,9 +440,11 @@ void BlockReaderImpl::AsyncReadPacket( struct State { std::shared_ptr bytes_transferred; }; + auto m = continuation::Pipeline::Create(cancel_state_); m->state().bytes_transferred = std::make_shared(0); + // Note: some of these continuations have nested pipelines. m->Push(new ReadPacketHeader(this)) .Push(new ReadChecksum(this)) .Push(new ReadPadding(this)) @@ -424,9 +459,8 @@ void BlockReaderImpl::AsyncReadPacket( } -size_t -BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, - Status *status) { +size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) +{ LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); size_t transferred = 0; @@ -443,12 +477,12 @@ BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, } -struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation { +struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation +{ RequestBlockContinuation(BlockReader *reader, const std::string &client_name, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset) - : reader_(reader), client_name_(client_name), length_(length), - offset_(offset) { + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) + : reader_(reader), client_name_(client_name), length_(length), offset_(offset) + { block_.CheckTypeAndMergeFrom(*block); } @@ -456,8 +490,7 @@ struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation { LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run(" << FMT_CONT_AND_READER_ADDR << ") called"); - reader_->AsyncRequestBlock(client_name_, &block_, length_, - offset_, next); + reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next); } private: @@ -468,12 +501,10 @@ private: uint64_t offset_; }; -struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation { - ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, - size_t *transferred) - : reader_(reader), buffer_(buffer), - buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { - } +struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation +{ + ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred) + : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {} virtual void Run(const Next &next) override { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run(" @@ -511,7 +542,8 @@ void BlockReaderImpl::AsyncReadBlock( const hadoop::hdfs::LocatedBlockProto &block, size_t offset, const MutableBuffers &buffers, - const std::function handler) { + const std::function handler) +{ LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" << FMT_THIS_ADDR << ") called"); @@ -520,12 +552,10 @@ void BlockReaderImpl::AsyncReadBlock( size_t size = asio::buffer_size(buffers); - m->Push(new RequestBlockContinuation(this, client_name, - &block.b(), size, offset)) + m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset)) .Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); - m->Run([handler] (const Status &status, - const size_t totalBytesTransferred) { + m->Run([handler] (const Status &status, const size_t totalBytesTransferred) { handler(status, totalBytesTransferred); }); }