HDFS-9265. InputStreamImpl should hold a shared_ptr of the BlockReader. Contributed by James Clampffer.

This commit is contained in:
Haohui Mai 2015-10-21 11:49:02 -07:00 committed by James Clampffer
parent 08794423e8
commit a14d1f741f

View File

@ -33,7 +33,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
typedef RemoteBlockReader<asio::ip::tcp::socket> Reader; typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
struct State { struct State {
std::unique_ptr<asio::ip::tcp::socket> conn_; std::unique_ptr<asio::ip::tcp::socket> conn_;
std::unique_ptr<Reader> reader_; std::shared_ptr<Reader> reader_;
std::array<asio::ip::tcp::endpoint, 1> endpoints_; std::array<asio::ip::tcp::endpoint, 1> endpoints_;
size_t transferred_; size_t transferred_;
Reader *reader() { return reader_.get(); } Reader *reader() { return reader_.get(); }
@ -47,7 +47,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
auto m = continuation::Pipeline<State>::Create(); auto m = continuation::Pipeline<State>::Create();
auto &s = m->state(); auto &s = m->state();
s.conn_.reset(new tcp::socket(*io_service)); s.conn_.reset(new tcp::socket(*io_service));
s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get())); s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get());
auto datanode = dn.id(); auto datanode = dn.id();
s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
datanode.xferport()); datanode.xferport());