From 50aa4ea7760bf2b0c59c467861c7ed2bdc1ea97e Mon Sep 17 00:00:00 2001 From: Bob Hansen Date: Mon, 6 Jun 2016 10:45:05 -0400 Subject: [PATCH] HDFS-10454: libhdfspp: Move NameNodeOp to a separate file. Contributed by Anatoli Shein. --- .../native/libhdfspp/lib/fs/CMakeLists.txt | 2 +- .../native/libhdfspp/lib/fs/filesystem.cc | 170 --------------- .../main/native/libhdfspp/lib/fs/filesystem.h | 55 +---- .../libhdfspp/lib/fs/namenode_operations.cc | 200 ++++++++++++++++++ .../libhdfspp/lib/fs/namenode_operations.h | 76 +++++++ 5 files changed, 278 insertions(+), 225 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt index 7d750b2aae..0bce70d834 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -16,6 +16,6 @@ # limitations under the License. # -add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc) +add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc) add_dependencies(fs_obj proto) add_library(fs $) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index eace6fa98a..b05bc3d541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -17,11 +17,6 @@ */ #include "filesystem.h" -#include "common/continuation/asio.h" -#include "common/util.h" -#include "common/logging.h" - -#include #include #include @@ -42,171 +37,6 @@ using ::asio::ip::tcp; static constexpr uint16_t kDefaultPort = 8020; - -/***************************************************************************** - * NAMENODE OPERATIONS - ****************************************************************************/ - -void NameNodeOperations::Connect(const std::string &cluster_name, - const std::string &server, - const std::string &service, - std::function &&handler) { - using namespace asio_continuation; - typedef std::vector State; - auto m = Pipeline::Create(); - m->Push(Resolve(io_service_, server, service, - std::back_inserter(m->state()))) - .Push(Bind([this, m, cluster_name](const Continuation::Next &next) { - engine_.Connect(cluster_name, m->state(), next); - })); - m->Run([this, handler](const Status &status, const State &) { - handler(status); - }); -} - -void NameNodeOperations::GetBlockLocations(const std::string & path, - std::function)> handler) -{ - using ::hadoop::hdfs::GetBlockLocationsRequestProto; - using ::hadoop::hdfs::GetBlockLocationsResponseProto; - - LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations(" - << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); - - GetBlockLocationsRequestProto req; - req.set_src(path); - req.set_offset(0); - req.set_length(std::numeric_limits::max()); - - auto resp = std::make_shared(); - - namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) { - if (stat.ok()) { - auto file_info = std::make_shared(); - auto locations = resp->locations(); - - file_info->file_length_ = locations.filelength(); - file_info->last_block_complete_ = locations.islastblockcomplete(); - file_info->under_construction_ = locations.underconstruction(); - - for (const auto &block : locations.blocks()) { - file_info->blocks_.push_back(block); - } - - if (!locations.islastblockcomplete() && - locations.has_lastblock() && locations.lastblock().b().numbytes()) { - file_info->blocks_.push_back(locations.lastblock()); - file_info->file_length_ += locations.lastblock().b().numbytes(); - } - - handler(stat, file_info); - } else { - handler(stat, nullptr); - } - }); -} - -void NameNodeOperations::GetFileInfo(const std::string & path, - std::function handler) -{ - using ::hadoop::hdfs::GetFileInfoRequestProto; - using ::hadoop::hdfs::GetFileInfoResponseProto; - - LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo(" - << FMT_THIS_ADDR << ", path=" << path << ") called"); - - GetFileInfoRequestProto req; - req.set_src(path); - - auto resp = std::make_shared(); - - namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) { - if (stat.ok()) { - // For non-existant files, the server will respond with an OK message but - // no fs in the protobuf. - if(resp -> has_fs()){ - struct StatInfo stat_info; - stat_info.path=path; - HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); - handler(stat, stat_info); - } else { - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - handler(statNew, StatInfo()); - } - } else { - handler(stat, StatInfo()); - } - }); -} - -void NameNodeOperations::GetListing( - const std::string & path, - std::function> &, bool)> handler, - const std::string & start_after) { - using ::hadoop::hdfs::GetListingRequestProto; - using ::hadoop::hdfs::GetListingResponseProto; - - LOG_TRACE( - kFileSystem, - << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - GetListingRequestProto req; - req.set_src(path); - req.set_startafter(start_after.c_str()); - req.set_needlocation(false); - - auto resp = std::make_shared(); - - namenode_.GetListing( - &req, - resp, - [resp, handler, path](const Status &stat) { - if (stat.ok()) { - if(resp -> has_dirlist()){ - std::shared_ptr> stat_infos(new std::vector); - for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { - StatInfo si; - si.path=fs.path(); - HdfsFileStatusProtoToStatInfo(si, fs); - stat_infos->push_back(si); - } - handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); - } else { - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - std::shared_ptr> stat_infos; - handler(statNew, stat_infos, false); - } - } else { - std::shared_ptr> stat_infos; - handler(stat, stat_infos, false); - } - }); -} - - -void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) { - engine_.SetFsEventCallback(callback); -} - -void NameNodeOperations::HdfsFileStatusProtoToStatInfo( - hdfs::StatInfo & stat_info, - const ::hadoop::hdfs::HdfsFileStatusProto & fs) { - stat_info.file_type = fs.filetype(); - stat_info.length = fs.length(); - stat_info.permissions = fs.permission().perm(); - stat_info.owner = fs.owner(); - stat_info.group = fs.group(); - stat_info.modification_time = fs.modification_time(); - stat_info.access_time = fs.access_time(); - stat_info.symlink = fs.symlink(); - stat_info.block_replication = fs.block_replication(); - stat_info.blocksize = fs.blocksize(); - stat_info.fileid = fs.fileid(); - stat_info.children_num = fs.childrennum(); -} - /***************************************************************************** * FILESYSTEM BASE CLASS ****************************************************************************/ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 869fd2dc07..8f795fb0e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -19,71 +19,18 @@ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #include "filehandle.h" -#include "common/libhdfs_events_impl.h" -#include "common/hdfs_public_api.h" -#include "common/async_stream.h" -#include "common/new_delete.h" #include "hdfspp/hdfspp.h" #include "fs/bad_datanode_tracker.h" -#include "rpc/rpc_engine.h" #include "reader/block_reader.h" #include "reader/fileinfo.h" -#include "hdfspp/statinfo.h" -#include "ClientNamenodeProtocol.pb.h" -#include "ClientNamenodeProtocol.hrpc.inl" #include "asio.hpp" #include +#include "namenode_operations.h" namespace hdfs { -/** - * NameNodeConnection: abstracts the details of communicating with a NameNode - * and the implementation of the communications protocol. - * - * Will eventually handle retry and failover. - * - * Threading model: thread-safe; all operations can be called concurrently - * Lifetime: owned by a FileSystemImpl - */ -class NameNodeOperations { -public: - MEMCHECKED_CLASS(NameNodeOperations); - NameNodeOperations(::asio::io_service *io_service, const Options &options, - const std::string &client_name, const std::string &user_name, - const char *protocol_name, int protocol_version) : - io_service_(io_service), - engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), - namenode_(& engine_) {} - - void Connect(const std::string &cluster_name, - const std::string &server, - const std::string &service, - std::function &&handler); - - void GetBlockLocations(const std::string & path, - std::function)> handler); - - void GetFileInfo(const std::string & path, - std::function handler); - - // start_after="" for initial call - void GetListing(const std::string & path, - std::function>&, bool)> handler, - const std::string & start_after = ""); - - void SetFsEventCallback(fs_event_callback callback); - -private: - static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs); - static void DirectoryListingProtoToStatInfo(std::shared_ptr> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); - - ::asio::io_service * io_service_; - RpcEngine engine_; - ClientNamenodeProtocol namenode_; -}; - /* * FileSystem: The consumer's main point of interaction with the cluster as * a whole. diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc new file mode 100644 index 0000000000..250e719a71 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" +#include "common/continuation/asio.h" + +#include + +#include +#include +#include +#include +#include +#include + +#define FMT_THIS_ADDR "this=" << (void*)this + +using ::asio::ip::tcp; + +namespace hdfs { + +/***************************************************************************** + * NAMENODE OPERATIONS + ****************************************************************************/ + +void NameNodeOperations::Connect(const std::string &cluster_name, + const std::string &server, + const std::string &service, + std::function &&handler) { + using namespace asio_continuation; + typedef std::vector State; + auto m = Pipeline::Create(); + m->Push(Resolve(io_service_, server, service, + std::back_inserter(m->state()))) + .Push(Bind([this, m, cluster_name](const Continuation::Next &next) { + engine_.Connect(cluster_name, m->state(), next); + })); + m->Run([this, handler](const Status &status, const State &) { + handler(status); + }); +} + +void NameNodeOperations::GetBlockLocations(const std::string & path, + std::function)> handler) +{ + using ::hadoop::hdfs::GetBlockLocationsRequestProto; + using ::hadoop::hdfs::GetBlockLocationsResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); + + GetBlockLocationsRequestProto req; + req.set_src(path); + req.set_offset(0); + req.set_length(std::numeric_limits::max()); + + auto resp = std::make_shared(); + + namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) { + if (stat.ok()) { + auto file_info = std::make_shared(); + auto locations = resp->locations(); + + file_info->file_length_ = locations.filelength(); + file_info->last_block_complete_ = locations.islastblockcomplete(); + file_info->under_construction_ = locations.underconstruction(); + + for (const auto &block : locations.blocks()) { + file_info->blocks_.push_back(block); + } + + if (!locations.islastblockcomplete() && + locations.has_lastblock() && locations.lastblock().b().numbytes()) { + file_info->blocks_.push_back(locations.lastblock()); + file_info->file_length_ += locations.lastblock().b().numbytes(); + } + + handler(stat, file_info); + } else { + handler(stat, nullptr); + } + }); +} + +void NameNodeOperations::GetFileInfo(const std::string & path, + std::function handler) +{ + using ::hadoop::hdfs::GetFileInfoRequestProto; + using ::hadoop::hdfs::GetFileInfoResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo(" + << FMT_THIS_ADDR << ", path=" << path << ") called"); + + GetFileInfoRequestProto req; + req.set_src(path); + + auto resp = std::make_shared(); + + namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // For non-existant files, the server will respond with an OK message but + // no fs in the protobuf. + if(resp -> has_fs()){ + struct StatInfo stat_info; + stat_info.path=path; + HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); + handler(stat, stat_info); + } else { + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew, StatInfo()); + } + } else { + handler(stat, StatInfo()); + } + }); +} + +void NameNodeOperations::GetListing( + const std::string & path, + std::function> &, bool)> handler, + const std::string & start_after) { + using ::hadoop::hdfs::GetListingRequestProto; + using ::hadoop::hdfs::GetListingResponseProto; + + LOG_TRACE( + kFileSystem, + << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + GetListingRequestProto req; + req.set_src(path); + req.set_startafter(start_after.c_str()); + req.set_needlocation(false); + + auto resp = std::make_shared(); + + namenode_.GetListing( + &req, + resp, + [resp, handler, path](const Status &stat) { + if (stat.ok()) { + if(resp -> has_dirlist()){ + std::shared_ptr> stat_infos(new std::vector); + for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { + StatInfo si; + si.path=fs.path(); + HdfsFileStatusProtoToStatInfo(si, fs); + stat_infos->push_back(si); + } + handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); + } else { + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + std::shared_ptr> stat_infos; + handler(statNew, stat_infos, false); + } + } else { + std::shared_ptr> stat_infos; + handler(stat, stat_infos, false); + } + }); +} + +void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) { + engine_.SetFsEventCallback(callback); +} + +void NameNodeOperations::HdfsFileStatusProtoToStatInfo( + hdfs::StatInfo & stat_info, + const ::hadoop::hdfs::HdfsFileStatusProto & fs) { + stat_info.file_type = fs.filetype(); + stat_info.length = fs.length(); + stat_info.permissions = fs.permission().perm(); + stat_info.owner = fs.owner(); + stat_info.group = fs.group(); + stat_info.modification_time = fs.modification_time(); + stat_info.access_time = fs.access_time(); + stat_info.symlink = fs.symlink(); + stat_info.block_replication = fs.block_replication(); + stat_info.blocksize = fs.blocksize(); + stat_info.fileid = fs.fileid(); + stat_info.children_num = fs.childrennum(); +} + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h new file mode 100644 index 0000000000..fafd87442c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_ +#define LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_ + +#include "rpc/rpc_engine.h" +#include "hdfspp/statinfo.h" +#include "ClientNamenodeProtocol.pb.h" +#include "ClientNamenodeProtocol.hrpc.inl" + + +namespace hdfs { + +/** +* NameNodeConnection: abstracts the details of communicating with a NameNode +* and the implementation of the communications protocol. +* +* Will eventually handle retry and failover. +* +* Threading model: thread-safe; all operations can be called concurrently +* Lifetime: owned by a FileSystemImpl +*/ +class NameNodeOperations { +public: + MEMCHECKED_CLASS(NameNodeOperations); + NameNodeOperations(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const std::string &user_name, + const char *protocol_name, int protocol_version) : + io_service_(io_service), + engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), + namenode_(& engine_) {} + + void Connect(const std::string &cluster_name, + const std::string &server, + const std::string &service, + std::function &&handler); + + void GetBlockLocations(const std::string & path, + std::function)> handler); + + void GetFileInfo(const std::string & path, + std::function handler); + + // start_after="" for initial call + void GetListing(const std::string & path, + std::function>&, bool)> handler, + const std::string & start_after = ""); + + void SetFsEventCallback(fs_event_callback callback); + +private: + static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs); + static void DirectoryListingProtoToStatInfo(std::shared_ptr> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); + + ::asio::io_service * io_service_; + RpcEngine engine_; + ClientNamenodeProtocol namenode_; +}; +} + +#endif