HDFS-10454: libhdfspp: Move NameNodeOp to a separate file. Contributed by Anatoli Shein.

This commit is contained in:
Bob Hansen 2016-06-06 10:45:05 -04:00 committed by James Clampffer
parent 0496bc5464
commit 50aa4ea776
5 changed files with 278 additions and 225 deletions

View File

@ -16,6 +16,6 @@
# limitations under the License. # limitations under the License.
# #
add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc) add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc)
add_dependencies(fs_obj proto) add_dependencies(fs_obj proto)
add_library(fs $<TARGET_OBJECTS:fs_obj>) add_library(fs $<TARGET_OBJECTS:fs_obj>)

View File

@ -17,11 +17,6 @@
*/ */
#include "filesystem.h" #include "filesystem.h"
#include "common/continuation/asio.h"
#include "common/util.h"
#include "common/logging.h"
#include <asio/ip/tcp.hpp>
#include <functional> #include <functional>
#include <limits> #include <limits>
@ -42,171 +37,6 @@ using ::asio::ip::tcp;
static constexpr uint16_t kDefaultPort = 8020; static constexpr uint16_t kDefaultPort = 8020;
/*****************************************************************************
* NAMENODE OPERATIONS
****************************************************************************/
void NameNodeOperations::Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
std::function<void(const Status &)> &&handler) {
using namespace asio_continuation;
typedef std::vector<tcp::endpoint> State;
auto m = Pipeline<State>::Create();
m->Push(Resolve(io_service_, server, service,
std::back_inserter(m->state())))
.Push(Bind([this, m, cluster_name](const Continuation::Next &next) {
engine_.Connect(cluster_name, m->state(), next);
}));
m->Run([this, handler](const Status &status, const State &) {
handler(status);
});
}
void NameNodeOperations::GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
{
using ::hadoop::hdfs::GetBlockLocationsRequestProto;
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
<< FMT_THIS_ADDR << ", path=" << path << ", ...) called");
GetBlockLocationsRequestProto req;
req.set_src(path);
req.set_offset(0);
req.set_length(std::numeric_limits<long long>::max());
auto resp = std::make_shared<GetBlockLocationsResponseProto>();
namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
if (stat.ok()) {
auto file_info = std::make_shared<struct FileInfo>();
auto locations = resp->locations();
file_info->file_length_ = locations.filelength();
file_info->last_block_complete_ = locations.islastblockcomplete();
file_info->under_construction_ = locations.underconstruction();
for (const auto &block : locations.blocks()) {
file_info->blocks_.push_back(block);
}
if (!locations.islastblockcomplete() &&
locations.has_lastblock() && locations.lastblock().b().numbytes()) {
file_info->blocks_.push_back(locations.lastblock());
file_info->file_length_ += locations.lastblock().b().numbytes();
}
handler(stat, file_info);
} else {
handler(stat, nullptr);
}
});
}
void NameNodeOperations::GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler)
{
using ::hadoop::hdfs::GetFileInfoRequestProto;
using ::hadoop::hdfs::GetFileInfoResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
<< FMT_THIS_ADDR << ", path=" << path << ") called");
GetFileInfoRequestProto req;
req.set_src(path);
auto resp = std::make_shared<GetFileInfoResponseProto>();
namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
if (stat.ok()) {
// For non-existant files, the server will respond with an OK message but
// no fs in the protobuf.
if(resp -> has_fs()){
struct StatInfo stat_info;
stat_info.path=path;
HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
handler(stat, stat_info);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
handler(statNew, StatInfo());
}
} else {
handler(stat, StatInfo());
}
});
}
void NameNodeOperations::GetListing(
const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
const std::string & start_after) {
using ::hadoop::hdfs::GetListingRequestProto;
using ::hadoop::hdfs::GetListingResponseProto;
LOG_TRACE(
kFileSystem,
<< "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
GetListingRequestProto req;
req.set_src(path);
req.set_startafter(start_after.c_str());
req.set_needlocation(false);
auto resp = std::make_shared<GetListingResponseProto>();
namenode_.GetListing(
&req,
resp,
[resp, handler, path](const Status &stat) {
if (stat.ok()) {
if(resp -> has_dirlist()){
std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
StatInfo si;
si.path=fs.path();
HdfsFileStatusProtoToStatInfo(si, fs);
stat_infos->push_back(si);
}
handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(statNew, stat_infos, false);
}
} else {
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(stat, stat_infos, false);
}
});
}
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
engine_.SetFsEventCallback(callback);
}
void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
hdfs::StatInfo & stat_info,
const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
stat_info.file_type = fs.filetype();
stat_info.length = fs.length();
stat_info.permissions = fs.permission().perm();
stat_info.owner = fs.owner();
stat_info.group = fs.group();
stat_info.modification_time = fs.modification_time();
stat_info.access_time = fs.access_time();
stat_info.symlink = fs.symlink();
stat_info.block_replication = fs.block_replication();
stat_info.blocksize = fs.blocksize();
stat_info.fileid = fs.fileid();
stat_info.children_num = fs.childrennum();
}
/***************************************************************************** /*****************************************************************************
* FILESYSTEM BASE CLASS * FILESYSTEM BASE CLASS
****************************************************************************/ ****************************************************************************/

View File

@ -19,71 +19,18 @@
#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
#include "filehandle.h" #include "filehandle.h"
#include "common/libhdfs_events_impl.h"
#include "common/hdfs_public_api.h"
#include "common/async_stream.h"
#include "common/new_delete.h"
#include "hdfspp/hdfspp.h" #include "hdfspp/hdfspp.h"
#include "fs/bad_datanode_tracker.h" #include "fs/bad_datanode_tracker.h"
#include "rpc/rpc_engine.h"
#include "reader/block_reader.h" #include "reader/block_reader.h"
#include "reader/fileinfo.h" #include "reader/fileinfo.h"
#include "hdfspp/statinfo.h"
#include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl"
#include "asio.hpp" #include "asio.hpp"
#include <thread> #include <thread>
#include "namenode_operations.h"
namespace hdfs { namespace hdfs {
/**
* NameNodeConnection: abstracts the details of communicating with a NameNode
* and the implementation of the communications protocol.
*
* Will eventually handle retry and failover.
*
* Threading model: thread-safe; all operations can be called concurrently
* Lifetime: owned by a FileSystemImpl
*/
class NameNodeOperations {
public:
MEMCHECKED_CLASS(NameNodeOperations);
NameNodeOperations(::asio::io_service *io_service, const Options &options,
const std::string &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version) :
io_service_(io_service),
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
namenode_(& engine_) {}
void Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
std::function<void(const Status &)> &&handler);
void GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
void GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler);
// start_after="" for initial call
void GetListing(const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
const std::string & start_after = "");
void SetFsEventCallback(fs_event_callback callback);
private:
static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs);
static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
::asio::io_service * io_service_;
RpcEngine engine_;
ClientNamenodeProtocol namenode_;
};
/* /*
* FileSystem: The consumer's main point of interaction with the cluster as * FileSystem: The consumer's main point of interaction with the cluster as
* a whole. * a whole.

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "filesystem.h"
#include "common/continuation/asio.h"
#include <asio/ip/tcp.hpp>
#include <functional>
#include <limits>
#include <future>
#include <tuple>
#include <iostream>
#include <pwd.h>
#define FMT_THIS_ADDR "this=" << (void*)this
using ::asio::ip::tcp;
namespace hdfs {
/*****************************************************************************
* NAMENODE OPERATIONS
****************************************************************************/
void NameNodeOperations::Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
std::function<void(const Status &)> &&handler) {
using namespace asio_continuation;
typedef std::vector<tcp::endpoint> State;
auto m = Pipeline<State>::Create();
m->Push(Resolve(io_service_, server, service,
std::back_inserter(m->state())))
.Push(Bind([this, m, cluster_name](const Continuation::Next &next) {
engine_.Connect(cluster_name, m->state(), next);
}));
m->Run([this, handler](const Status &status, const State &) {
handler(status);
});
}
void NameNodeOperations::GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
{
using ::hadoop::hdfs::GetBlockLocationsRequestProto;
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
<< FMT_THIS_ADDR << ", path=" << path << ", ...) called");
GetBlockLocationsRequestProto req;
req.set_src(path);
req.set_offset(0);
req.set_length(std::numeric_limits<long long>::max());
auto resp = std::make_shared<GetBlockLocationsResponseProto>();
namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
if (stat.ok()) {
auto file_info = std::make_shared<struct FileInfo>();
auto locations = resp->locations();
file_info->file_length_ = locations.filelength();
file_info->last_block_complete_ = locations.islastblockcomplete();
file_info->under_construction_ = locations.underconstruction();
for (const auto &block : locations.blocks()) {
file_info->blocks_.push_back(block);
}
if (!locations.islastblockcomplete() &&
locations.has_lastblock() && locations.lastblock().b().numbytes()) {
file_info->blocks_.push_back(locations.lastblock());
file_info->file_length_ += locations.lastblock().b().numbytes();
}
handler(stat, file_info);
} else {
handler(stat, nullptr);
}
});
}
void NameNodeOperations::GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler)
{
using ::hadoop::hdfs::GetFileInfoRequestProto;
using ::hadoop::hdfs::GetFileInfoResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
<< FMT_THIS_ADDR << ", path=" << path << ") called");
GetFileInfoRequestProto req;
req.set_src(path);
auto resp = std::make_shared<GetFileInfoResponseProto>();
namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
if (stat.ok()) {
// For non-existant files, the server will respond with an OK message but
// no fs in the protobuf.
if(resp -> has_fs()){
struct StatInfo stat_info;
stat_info.path=path;
HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
handler(stat, stat_info);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
handler(statNew, StatInfo());
}
} else {
handler(stat, StatInfo());
}
});
}
void NameNodeOperations::GetListing(
const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
const std::string & start_after) {
using ::hadoop::hdfs::GetListingRequestProto;
using ::hadoop::hdfs::GetListingResponseProto;
LOG_TRACE(
kFileSystem,
<< "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
GetListingRequestProto req;
req.set_src(path);
req.set_startafter(start_after.c_str());
req.set_needlocation(false);
auto resp = std::make_shared<GetListingResponseProto>();
namenode_.GetListing(
&req,
resp,
[resp, handler, path](const Status &stat) {
if (stat.ok()) {
if(resp -> has_dirlist()){
std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
StatInfo si;
si.path=fs.path();
HdfsFileStatusProtoToStatInfo(si, fs);
stat_infos->push_back(si);
}
handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(statNew, stat_infos, false);
}
} else {
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(stat, stat_infos, false);
}
});
}
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
engine_.SetFsEventCallback(callback);
}
void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
hdfs::StatInfo & stat_info,
const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
stat_info.file_type = fs.filetype();
stat_info.length = fs.length();
stat_info.permissions = fs.permission().perm();
stat_info.owner = fs.owner();
stat_info.group = fs.group();
stat_info.modification_time = fs.modification_time();
stat_info.access_time = fs.access_time();
stat_info.symlink = fs.symlink();
stat_info.block_replication = fs.block_replication();
stat_info.blocksize = fs.blocksize();
stat_info.fileid = fs.fileid();
stat_info.children_num = fs.childrennum();
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_
#define LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_
#include "rpc/rpc_engine.h"
#include "hdfspp/statinfo.h"
#include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl"
namespace hdfs {
/**
* NameNodeConnection: abstracts the details of communicating with a NameNode
* and the implementation of the communications protocol.
*
* Will eventually handle retry and failover.
*
* Threading model: thread-safe; all operations can be called concurrently
* Lifetime: owned by a FileSystemImpl
*/
class NameNodeOperations {
public:
MEMCHECKED_CLASS(NameNodeOperations);
NameNodeOperations(::asio::io_service *io_service, const Options &options,
const std::string &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version) :
io_service_(io_service),
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
namenode_(& engine_) {}
void Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
std::function<void(const Status &)> &&handler);
void GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
void GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler);
// start_after="" for initial call
void GetListing(const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
const std::string & start_after = "");
void SetFsEventCallback(fs_event_callback callback);
private:
static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs);
static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
::asio::io_service * io_service_;
RpcEngine engine_;
ClientNamenodeProtocol namenode_;
};
}
#endif