diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index cc46cba654..9090575b18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -204,6 +204,19 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ native_tests
+ test
+ run
+
+ ${skipTests}
+
+
+
+
+
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt
index ef14183b9b..309e99f98d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/CMakeLists.txt
@@ -18,4 +18,6 @@
cmake_minimum_required(VERSION 2.8 FATAL_ERROR)
+enable_testing()
+
add_subdirectory(libhdfspp)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
index cae786cdfa..51e31223f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
@@ -51,3 +51,4 @@ include_directories(
add_subdirectory(third_party/gmock-1.7.0)
add_subdirectory(lib)
+add_subdirectory(tests)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
new file mode 100644
index 0000000000..cd5e1b17a9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(test_common OBJECT mock_connection.cc)
+add_executable(remote_block_reader_test remote_block_reader_test.cc $)
+target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} gmock_main)
+add_test(remote_block_reader remote_block_reader_test)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc
new file mode 100644
index 0000000000..e1dfdc7c29
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mock_connection.h"
+
+namespace hdfs {
+
+MockConnectionBase::~MockConnectionBase() {}
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
new file mode 100644
index 0000000000..e917e9d27f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_
+#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_
+
+#include
+#include
+#include
+#include
+
+namespace hdfs {
+
+class MockConnectionBase {
+public:
+ virtual ~MockConnectionBase();
+ typedef std::pair ProducerResult;
+ template
+ void async_read_some(const MutableBufferSequence &buf, Handler &&handler) {
+ if (produced_.size() == 0) {
+ ProducerResult r = Produce();
+ if (r.first) {
+ handler(r.first, 0);
+ }
+ asio::mutable_buffers_1 data = produced_.prepare(r.second.size());
+ asio::buffer_copy(data, asio::buffer(r.second));
+ produced_.commit(r.second.size());
+ }
+
+ size_t len = std::min(asio::buffer_size(buf), produced_.size());
+ asio::buffer_copy(buf, produced_.data());
+ produced_.consume(len);
+ handler(asio::error_code(), len);
+ }
+
+ template
+ void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
+ // CompletionResult res = OnWrite(buf);
+ handler(asio::error_code(), asio::buffer_size(buf));
+ }
+
+protected:
+ virtual ProducerResult Produce() = 0;
+
+private:
+ asio::streambuf produced_;
+};
+}
+
+#endif
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
new file mode 100644
index 0000000000..92cbc8fa2e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mock_connection.h"
+
+#include "datatransfer.pb.h"
+#include "common/util.h"
+#include "reader/block_reader.h"
+
+#include
+#include
+#include
+#include
+
+using namespace hdfs;
+
+using ::hadoop::common::TokenProto;
+using ::hadoop::hdfs::BlockOpResponseProto;
+using ::hadoop::hdfs::ChecksumProto;
+using ::hadoop::hdfs::ExtendedBlockProto;
+using ::hadoop::hdfs::PacketHeaderProto;
+using ::hadoop::hdfs::ReadOpChecksumInfoProto;
+
+using ::asio::buffer;
+using ::asio::error_code;
+using ::asio::mutable_buffers_1;
+using ::testing::Return;
+using std::make_pair;
+using std::string;
+
+namespace pb = ::google::protobuf;
+namespace pbio = pb::io;
+
+namespace hdfs {
+
+class MockDNConnection : public MockConnectionBase {
+public:
+ MOCK_METHOD0(Produce, ProducerResult());
+};
+}
+
+static inline string ToDelimitedString(const pb::MessageLite *msg) {
+ string res;
+ res.reserve(hdfs::DelimitedPBMessageSize(msg));
+ pbio::StringOutputStream os(&res);
+ pbio::CodedOutputStream out(&os);
+ out.WriteVarint32(msg->ByteSize());
+ msg->SerializeToCodedStream(&out);
+ return res;
+}
+
+static inline std::pair Produce(const std::string &s) {
+ return make_pair(error_code(), s);
+}
+
+static inline std::pair
+ProducePacket(const std::string &data, const std::string &checksum,
+ int offset_in_block, int seqno, bool last_packet) {
+ PacketHeaderProto proto;
+ proto.set_datalen(data.size());
+ proto.set_offsetinblock(offset_in_block);
+ proto.set_seqno(seqno);
+ proto.set_lastpacketinblock(last_packet);
+
+ char prefix[6];
+ *reinterpret_cast(prefix) =
+ htonl(data.size() + checksum.size() + sizeof(int));
+ *reinterpret_cast(prefix + sizeof(int)) = htons(proto.ByteSize());
+ std::string payload(prefix, sizeof(prefix));
+ payload.reserve(payload.size() + proto.ByteSize() + checksum.size() +
+ data.size());
+ proto.AppendToString(&payload);
+ payload += checksum;
+ payload += data;
+ return std::make_pair(error_code(), std::move(payload));
+}
+
+static std::shared_ptr>
+ReadContent(MockDNConnection *conn, TokenProto *token,
+ const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
+ const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
+ BlockReaderOptions options;
+ auto reader =
+ std::make_shared>(options, conn);
+ Status result;
+ reader->async_connect(
+ "libhdfs++", token, &block, length, offset,
+ [buf, reader, status, transferred](const Status &stat) {
+ if (!stat.ok()) {
+ *status = stat;
+ } else {
+ reader->async_read_some(
+ buf, [status, transferred](const Status &stat, size_t t) {
+ *transferred = t;
+ *status = stat;
+ });
+ }
+ });
+ return reader;
+}
+
+TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
+ static const size_t kChunkSize = 512;
+ static const string kChunkData(kChunkSize, 'a');
+ MockDNConnection conn;
+ BlockOpResponseProto block_op_resp;
+
+ block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+ EXPECT_CALL(conn, Produce())
+ .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+ .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
+
+ ExtendedBlockProto block;
+ std::string data(kChunkSize, 0);
+ size_t transferred = 0;
+ Status stat;
+ ReadContent(&conn, nullptr, block, kChunkSize, 0,
+ buffer(const_cast(data.c_str()), data.size()), &stat,
+ &transferred);
+ ASSERT_TRUE(stat.ok());
+ ASSERT_EQ(kChunkSize, transferred);
+ ASSERT_EQ(kChunkData, data);
+}
+
+TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
+ static const size_t kChunkSize = 1024;
+ static const size_t kLength = kChunkSize / 4 * 3;
+ static const size_t kOffset = kChunkSize / 4;
+ static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
+
+ MockDNConnection conn;
+ BlockOpResponseProto block_op_resp;
+ ReadOpChecksumInfoProto *checksum_info =
+ block_op_resp.mutable_readopchecksuminfo();
+ checksum_info->set_chunkoffset(0);
+ ChecksumProto *checksum = checksum_info->mutable_checksum();
+ checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL);
+ checksum->set_bytesperchecksum(512);
+ block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+ EXPECT_CALL(conn, Produce())
+ .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+ .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
+
+ ExtendedBlockProto block;
+ string data(kLength, 0);
+ size_t transferred = 0;
+ Status stat;
+ ReadContent(&conn, nullptr, block, data.size(), kOffset,
+ buffer(const_cast(data.c_str()), data.size()), &stat,
+ &transferred);
+ ASSERT_TRUE(stat.ok());
+ ASSERT_EQ(kLength, transferred);
+ ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+}
+
+TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
+ static const size_t kChunkSize = 1024;
+ static const string kChunkData(kChunkSize, 'a');
+
+ MockDNConnection conn;
+ BlockOpResponseProto block_op_resp;
+ block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+ EXPECT_CALL(conn, Produce())
+ .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+ .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)))
+ .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
+
+ ExtendedBlockProto block;
+ string data(kChunkSize, 0);
+ size_t transferred = 0;
+ Status stat;
+ mutable_buffers_1 buf = buffer(const_cast(data.c_str()), data.size());
+ auto reader = ReadContent(&conn, nullptr, block, data.size(), 0, buf, &stat,
+ &transferred);
+ ASSERT_TRUE(stat.ok());
+ ASSERT_EQ(kChunkSize, transferred);
+ ASSERT_EQ(kChunkData, data);
+
+ data.clear();
+ data.resize(kChunkSize);
+ transferred = 0;
+
+ reader->async_read_some(buf, [&data](const Status &stat, size_t transferred) {
+ ASSERT_TRUE(stat.ok());
+ ASSERT_EQ(kChunkSize, transferred);
+ ASSERT_EQ(kChunkData, data);
+ });
+}
+
+int main(int argc, char *argv[]) {
+ // The following line must be executed to initialize Google Mock
+ // (and Google Test) before running the tests.
+ ::testing::InitGoogleMock(&argc, argv);
+ return RUN_ALL_TESTS();
+}