HDFS-8952. InputStream.PositionRead() should be aware of available DNs. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-08-24 18:10:17 -07:00 committed by James Clampffer
parent 1efb677976
commit bc47acc9d9
6 changed files with 118 additions and 42 deletions

View File

@ -21,6 +21,7 @@
#include "libhdfspp/status.h"
#include <functional>
#include <set>
namespace hdfs {
@ -57,12 +58,23 @@ public:
class InputStream {
public:
/**
* Read data from a specific position. The handler returns the
* number of bytes has read.
* Read data from a specific position. The current implementation
* stops at the block boundary.
*
* @param buf the pointer to the buffer
* @param nbyte the size of the buffer
* @param offset the offset the file
* @param excluded_datanodes the UUID of the datanodes that should
* not be used in this read
*
* The handler returns the datanode that serves the block and the number of
* bytes has read.
**/
virtual void
PositionRead(void *buf, size_t nbyte, size_t offset,
const std::function<void(const Status &, size_t)> &handler) = 0;
PositionRead(void *buf, size_t nbyte, uint64_t offset,
const std::set<std::string> &excluded_datanodes,
const std::function<void(const Status &, const std::string &,
size_t)> &handler) = 0;
virtual ~InputStream();
};

View File

@ -66,7 +66,6 @@ class Status {
// state_[4] == code
// state_[5..] == message
const char* state_;
friend class StatusHelper;
enum Code {
kOk = 0,

View File

@ -46,16 +46,20 @@ class InputStreamImpl : public InputStream {
public:
InputStreamImpl(FileSystemImpl *fs,
const ::hadoop::hdfs::LocatedBlocksProto *blocks);
virtual void PositionRead(
void *buf, size_t nbyte, size_t offset,
const std::function<void(const Status &, size_t)> &handler) override;
virtual void
PositionRead(void *buf, size_t nbyte, uint64_t offset,
const std::set<std::string> &excluded_datanodes,
const std::function<void(const Status &, const std::string &,
size_t)> &handler) override;
template <class MutableBufferSequence, class Handler>
void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
const std::set<std::string> &excluded_datanodes,
const Handler &handler);
template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
void AsyncReadBlock(const std::string &client_name,
const hadoop::hdfs::LocatedBlockProto &block,
size_t offset, const MutableBufferSequence &buffers,
const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
const MutableBufferSequence &buffers,
const Handler &handler);
private:

View File

@ -37,8 +37,10 @@ InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
}
void InputStreamImpl::PositionRead(
void *buf, size_t nbyte, size_t offset,
const std::function<void(const Status &, size_t)> &handler) {
AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
void *buf, size_t nbyte, uint64_t offset,
const std::set<std::string> &excluded_datanodes,
const std::function<void(const Status &, const std::string &, size_t)>
&handler) {
AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler);
}
}

View File

@ -33,7 +33,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
struct State {
std::unique_ptr<asio::ip::tcp::socket> conn_;
std::unique_ptr<Reader> reader_;
std::vector<asio::ip::tcp::endpoint> endpoints_;
std::array<asio::ip::tcp::endpoint, 1> endpoints_;
size_t transferred_;
Reader *reader() { return reader_.get(); }
size_t *transferred() { return &transferred_; }
@ -41,17 +41,15 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
};
static continuation::Pipeline<State> *
CreatePipeline(::asio::io_service *io_service,
const ::hadoop::hdfs::LocatedBlockProto &b) {
const ::hadoop::hdfs::DatanodeInfoProto &dn) {
using namespace ::asio::ip;
auto m = continuation::Pipeline<State>::Create();
auto &s = m->state();
s.conn_.reset(new tcp::socket(*io_service));
s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get()));
for (auto &loc : b.locs()) {
auto datanode = loc.id();
s.endpoints_.push_back(tcp::endpoint(
address::from_string(datanode.ipaddr()), datanode.xferport()));
}
auto datanode = dn.id();
s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
datanode.xferport());
m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
s.endpoints_.end()));
@ -125,12 +123,11 @@ private:
};
template <class MutableBufferSequence, class Handler>
void InputStreamImpl::AsyncPreadSome(size_t offset,
const MutableBufferSequence &buffers,
const Handler &handler) {
void InputStreamImpl::AsyncPreadSome(
size_t offset, const MutableBufferSequence &buffers,
const std::set<std::string> &excluded_datanodes, const Handler &handler) {
using ::hadoop::hdfs::DatanodeInfoProto;
using ::hadoop::hdfs::LocatedBlockProto;
namespace ip = ::asio::ip;
using ::asio::ip::tcp;
auto it = std::find_if(
blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
@ -138,10 +135,21 @@ void InputStreamImpl::AsyncPreadSome(size_t offset,
});
if (it == blocks_.end()) {
handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
return;
} else if (!it->locs_size()) {
handler(Status::ResourceUnavailable("No datanodes available"), 0);
}
const DatanodeInfoProto *chosen_dn = nullptr;
for (int i = 0; i < it->locs_size(); ++i) {
const auto &di = it->locs(i);
if (!excluded_datanodes.count(di.id().datanodeuuid())) {
chosen_dn = &di;
break;
}
}
if (!chosen_dn) {
handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
return;
}
@ -150,28 +158,30 @@ void InputStreamImpl::AsyncPreadSome(size_t offset,
it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
AsyncReadBlock<RemoteBlockReaderTrait>(
fs_->rpc_engine().client_name(), *it, offset_within_block,
fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block,
asio::buffer(buffers, size_within_block), handler);
}
template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
void InputStreamImpl::AsyncReadBlock(
const std::string &client_name,
const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
const hadoop::hdfs::LocatedBlockProto &block,
const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
const MutableBufferSequence &buffers, const Handler &handler) {
typedef typename BlockReaderTrait::Reader Reader;
auto m =
BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), block);
BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);
auto &s = m->state();
size_t size = asio::buffer_size(buffers);
m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
&block.b(), size, offset))
.Push(new ReadBlockContinuation<Reader, decltype(buffers)>(
s.reader(), buffers, s.transferred()));
m->Run([handler](const Status &status,
const std::string &dnid = dn.id().datanodeuuid();
m->Run([handler, dnid](const Status &status,
const typename BlockReaderTrait::State &state) {
handler(status, *state.transferred());
handler(status, dnid, *state.transferred());
});
}
}

View File

@ -20,6 +20,8 @@
#include <gmock/gmock.h>
using hadoop::common::TokenProto;
using hadoop::hdfs::DatanodeInfoProto;
using hadoop::hdfs::DatanodeIDProto;
using hadoop::hdfs::ExtendedBlockProto;
using hadoop::hdfs::LocatedBlockProto;
using hadoop::hdfs::LocatedBlocksProto;
@ -57,7 +59,7 @@ template <class Trait> struct MockBlockReaderTrait {
};
static continuation::Pipeline<State> *
CreatePipeline(::asio::io_service *, const LocatedBlockProto &) {
CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) {
auto m = continuation::Pipeline<State>::Create();
*m->state().transferred() = 0;
Trait::InitializeMockReader(m->state().reader());
@ -69,6 +71,7 @@ template <class Trait> struct MockBlockReaderTrait {
TEST(InputStreamTest, TestReadSingleTrunk) {
LocatedBlocksProto blocks;
LocatedBlockProto block;
DatanodeInfoProto dn;
char buf[4096] = {
0,
};
@ -82,14 +85,14 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
.WillOnce(InvokeArgument<5>(Status::OK()));
EXPECT_CALL(*reader, async_read_some(_,_))
EXPECT_CALL(*reader, async_read_some(_, _))
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
}
};
is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
"client", block, 0, asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, size_t transferred) {
"client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, const std::string &, size_t transferred) {
stat = status;
read = transferred;
});
@ -101,6 +104,7 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
TEST(InputStreamTest, TestReadMultipleTrunk) {
LocatedBlocksProto blocks;
LocatedBlockProto block;
DatanodeInfoProto dn;
char buf[4096] = {
0,
};
@ -114,15 +118,16 @@ TEST(InputStreamTest, TestReadMultipleTrunk) {
EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
.WillOnce(InvokeArgument<5>(Status::OK()));
EXPECT_CALL(*reader, async_read_some(_,_))
EXPECT_CALL(*reader, async_read_some(_, _))
.Times(4)
.WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
}
};
is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
"client", block, 0, asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, size_t transferred) {
"client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, const std::string &,
size_t transferred) {
stat = status;
read = transferred;
});
@ -134,6 +139,7 @@ TEST(InputStreamTest, TestReadMultipleTrunk) {
TEST(InputStreamTest, TestReadError) {
LocatedBlocksProto blocks;
LocatedBlockProto block;
DatanodeInfoProto dn;
char buf[4096] = {
0,
};
@ -147,7 +153,7 @@ TEST(InputStreamTest, TestReadError) {
EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
.WillOnce(InvokeArgument<5>(Status::OK()));
EXPECT_CALL(*reader, async_read_some(_,_))
EXPECT_CALL(*reader, async_read_some(_, _))
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
@ -156,8 +162,9 @@ TEST(InputStreamTest, TestReadError) {
};
is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
"client", block, 0, asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, size_t transferred) {
"client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, const std::string &,
size_t transferred) {
stat = status;
read = transferred;
});
@ -166,6 +173,48 @@ TEST(InputStreamTest, TestReadError) {
read = 0;
}
TEST(InputStreamTest, TestExcludeDataNode) {
LocatedBlocksProto blocks;
LocatedBlockProto *block = blocks.add_blocks();
ExtendedBlockProto *b = block->mutable_b();
b->set_poolid("");
b->set_blockid(1);
b->set_generationstamp(1);
b->set_numbytes(4096);
DatanodeInfoProto *di = block->add_locs();
DatanodeIDProto *dnid = di->mutable_id();
dnid->set_datanodeuuid("foo");
char buf[4096] = {
0,
};
IoServiceImpl io_service;
FileSystemImpl fs(&io_service);
InputStreamImpl is(&fs, &blocks);
Status stat;
size_t read = 0;
struct Trait {
static void InitializeMockReader(MockReader *reader) {
EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
.WillOnce(InvokeArgument<5>(Status::OK()));
EXPECT_CALL(*reader, async_read_some(_, _))
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
}
};
std::set<std::string> excluded_dn({"foo"});
is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn,
[&stat, &read](const Status &status, const std::string &, size_t transferred) {
stat = status;
read = transferred;
});
ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
ASSERT_EQ(0UL, read);
}
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.