diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h index 039daf787d..3f650ce6b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -27,6 +27,7 @@ #include #include #include +#include namespace hdfs { namespace asio_continuation { @@ -36,7 +37,7 @@ using namespace continuation; template class ReadContinuation : public Continuation { public: - ReadContinuation(Stream *stream, const MutableBufferSequence &buffer) + ReadContinuation(std::shared_ptr& stream, const MutableBufferSequence &buffer) : stream_(stream), buffer_(buffer) {} virtual void Run(const Next &next) override { auto handler = @@ -45,14 +46,16 @@ public: } private: - Stream *stream_; + // prevent construction from raw ptr + ReadContinuation(Stream *stream, MutableBufferSequence &buffer); + std::shared_ptr stream_; MutableBufferSequence buffer_; }; template class WriteContinuation : public Continuation { public: - WriteContinuation(Stream *stream, const ConstBufferSequence &buffer) + WriteContinuation(std::shared_ptr& stream, const ConstBufferSequence &buffer) : stream_(stream), buffer_(buffer) {} virtual void Run(const Next &next) override { @@ -62,7 +65,9 @@ public: } private: - Stream *stream_; + // prevent construction from raw ptr + WriteContinuation(Stream *stream, ConstBufferSequence &buffer); + std::shared_ptr stream_; ConstBufferSequence buffer_; }; @@ -117,13 +122,13 @@ private: }; template -static inline Continuation *Write(Stream *stream, +static inline Continuation *Write(std::shared_ptr stream, const ConstBufferSequence &buffer) { return new WriteContinuation(stream, buffer); } template -static inline Continuation *Read(Stream *stream, +static inline Continuation *Read(std::shared_ptr stream, const MutableBufferSequence &buffer) { return new ReadContinuation(stream, buffer); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h index 54caeed2eb..3dc75c23ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -94,13 +94,14 @@ struct WriteDelimitedPBMessageContinuation : Continuation { : stream_(stream), msg_(msg) {} virtual void Run(const Next &next) override { - namespace pbio = google::protobuf::io; - int size = msg_->ByteSize(); - buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); - pbio::StringOutputStream ss(&buf_); - pbio::CodedOutputStream os(&ss); - os.WriteVarint32(size); - msg_->SerializeToCodedStream(&os); + bool success = true; + buf_ = SerializeDelimitedProtobufMessage(msg_, &success); + + if(!success) { + next(Status::Error("Unable to serialize protobuf message.")); + return; + } + asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } ); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index eaef2d08c7..cc65ec27bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -18,12 +18,44 @@ #include "common/util.h" +#include + namespace hdfs { +bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, + ::google::protobuf::MessageLite *msg) { + uint32_t size = 0; + in->ReadVarint32(&size); + auto limit = in->PushLimit(size); + bool res = msg->ParseFromCodedStream(in); + in->PopLimit(limit); + + return res; +} + + +std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg, + bool *err) { + namespace pbio = ::google::protobuf::io; + + std::string buf; + + int size = msg->ByteSize(); + buf.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); + pbio::StringOutputStream ss(&buf); + pbio::CodedOutputStream os(&ss); + os.WriteVarint32(size); + + if(err) + *err = msg->SerializeToCodedStream(&os); + + return buf; +} + + std::string GetRandomClientName() { - unsigned char buf[6] = { - 0, - }; + unsigned char buf[6]; + RAND_pseudo_bytes(buf, sizeof(buf)); std::stringstream ss; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index 60f70e54b4..870ce2e7e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -28,6 +28,7 @@ #include #include + namespace hdfs { static inline Status ToStatus(const ::asio::error_code &ec) { @@ -38,32 +39,30 @@ static inline Status ToStatus(const ::asio::error_code &ec) { } } -static inline int DelimitedPBMessageSize( - const ::google::protobuf::MessageLite *msg) { +// Determine size of buffer that needs to be allocated in order to serialize msg +// in delimited format +static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) { size_t size = msg->ByteSize(); return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; } -static inline void ReadDelimitedPBMessage( - ::google::protobuf::io::CodedInputStream *in, - ::google::protobuf::MessageLite *msg) { - uint32_t size = 0; - in->ReadVarint32(&size); - auto limit = in->PushLimit(size); - msg->ParseFromCodedStream(in); - in->PopLimit(limit); -} +// Construct msg from the input held in the CodedInputStream +// return false on failure, otherwise return true +bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, + ::google::protobuf::MessageLite *msg); + +// Serialize msg into a delimited form (java protobuf compatible) +// err, if not null, will be set to false on failure +std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg, + bool *err); std::string Base64Encode(const std::string &src); -/* - * Returns a new high-entropy client name - */ +// Return a new high-entropy client name std::string GetRandomClientName(); -/* Returns true if _someone_ is holding the lock (not necessarily this thread, - * but a std::mutex doesn't track which thread is holding the lock) - */ +// Returns true if _someone_ is holding the lock (not necessarily this thread, +// but a std::mutex doesn't track which thread is holding the lock) template bool lock_held(T & mutex) { bool result = !mutex.try_lock(); @@ -72,8 +71,6 @@ bool lock_held(T & mutex) { return result; } - - } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index 4ee86c2b6d..50529511d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -20,6 +20,7 @@ #include "common/continuation/continuation.h" #include "common/continuation/asio.h" #include "common/logging.h" +#include "common/util.h" #include @@ -55,6 +56,9 @@ ReadBlockProto(const std::string &client_name, bool verify_checksum, return p; } + +static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock}; + void BlockReaderImpl::AsyncRequestBlock( const std::string &client_name, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, @@ -78,17 +82,24 @@ void BlockReaderImpl::AsyncRequestBlock( auto m = continuation::Pipeline::Create(cancel_state_); State *s = &m->state(); - s->header.insert(s->header.begin(), - {0, kDataTransferVersion, Operation::kReadBlock}); - s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum, - dn_->token_.get(), block, length, offset)); + s->request = ReadBlockProto(client_name, options_.verify_checksum, + dn_->token_.get(), block, length, offset); + + s->header = std::string((const char*)unsecured_request_block_header, 3); + + bool serialize_success = true; + s->header += SerializeDelimitedProtobufMessage(&s->request, &serialize_success); + + if(!serialize_success) { + handler(Status::Error("Unable to serialize protobuf message")); + return; + } auto read_pb_message = new continuation::ReadDelimitedPBMessageContinuation( dn_, &s->response); - m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header))) - .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request)) + m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))) .Push(read_pb_message); m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h index e2cd790f0d..77e618dd7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h @@ -117,7 +117,7 @@ void DataTransferSaslStream::Handshake(const Handler &next) { DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); - m->Push(Write(stream_.get(), kMagicNumberBuffer)) + m->Push(Write(stream_, kMagicNumberBuffer)) .Push(WriteDelimitedPBMessage(stream_, &s->req0)) .Push(new ReadSaslMessage(stream_, &s->resp0)) .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))