HDFS-10754: libhdfs++: Create tools directory and implement hdfs_cat, hdfs_chgrp, hdfs_chown, hdfs_chmod and hdfs_find. Contributed by Anatoli Shein.

This commit is contained in:
James 2016-08-26 12:52:05 -04:00 committed by James Clampffer
parent 05ddb31081
commit 4f6cb5d1a1
31 changed files with 1828 additions and 283 deletions

View File

@ -163,6 +163,7 @@ add_subdirectory(third_party/uriparser2)
add_subdirectory(lib)
add_subdirectory(tests)
add_subdirectory(examples)
add_subdirectory(tools)
# create an empty file; hadoop_add_dual_library wraps add_library which
# requires at least one file as an argument

View File

@ -17,4 +17,5 @@
#
add_subdirectory(cat)
add_subdirectory(gendirs)
add_subdirectory(gendirs)
add_subdirectory(find)

View File

@ -17,105 +17,91 @@
under the License.
*/
/*
A a stripped down version of unix's "cat".
Doesn't deal with any flags for now, will just attempt to read the whole file.
*/
/**
* Unix-like cat tool example.
*
* Reads the specified file from HDFS and outputs to stdout.
*
* Usage: cat /<path-to-file>
*
* Example: cat /dir/file
*
* @param path-to-file Absolute path to the file to read.
*
**/
#include "hdfspp/hdfspp.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include "common/uri.h"
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/stubs/common.h>
using namespace std;
using namespace hdfs;
#define SCHEME "hdfs"
#define BUF_SIZE 4096
int main(int argc, char *argv[]) {
if (argc != 2) {
cerr << "usage: cat [hdfs://[<hostname>:<port>]]/<path-to-file>" << endl;
return 1;
std::cerr << "usage: cat /<path-to-file>" << std::endl;
exit(EXIT_FAILURE);
}
std::string path = argv[1];
optional<URI> uri;
const string uri_path = argv[1];
//Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind
size_t scheme_end = uri_path.find("://");
if (scheme_end != string::npos) {
if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) {
cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl;
return 1;
} else {
uri = URI::parse_from_string(uri_path);
}
}
if (!uri) {
cerr << "Malformed URI: " << uri_path << endl;
return 1;
}
ConfigurationLoader loader;
optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR");
if (envHadoopConfDir && (*envHadoopConfDir != 0) ) {
config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml");
}
Options options;
hdfs::Options options;
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
hdfs::ConfigurationLoader loader;
//Loading default config files core-site.xml and hdfs-site.xml from the config path
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
if(config){
//Loading options from the config
options = config->GetOptions();
}
IoService * io_service = IoService::New();
FileSystem *fs_raw = FileSystem::New(io_service, "", options);
if (!fs_raw) {
cerr << "Could not create FileSystem object" << endl;
return 1;
hdfs::IoService * io_service = hdfs::IoService::New();
//Wrapping fs into a shared pointer to guarantee deletion
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
if (!fs) {
std::cerr << "Could not connect the file system." << std::endl;
exit(EXIT_FAILURE);
}
//wrapping fs_raw into a unique pointer to guarantee deletion
unique_ptr<FileSystem> fs(fs_raw);
Status stat = fs->Connect(uri->get_host(), to_string(*(uri->get_port())));
if (!stat.ok()) {
cerr << "Could not connect to " << uri->get_host() << ":" << *(uri->get_port()) << endl;
return 1;
hdfs::Status status = fs->ConnectToDefaultFs();
if (!status.ok()) {
if(!options.defaultFS.get_host().empty()){
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
} else {
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
}
exit(EXIT_FAILURE);
}
FileHandle *file_raw = nullptr;
stat = fs->Open(uri->get_path(), &file_raw);
if (!stat.ok()) {
cerr << "Could not open file " << uri->get_path() << endl;
return 1;
hdfs::FileHandle *file_raw = nullptr;
status = fs->Open(path, &file_raw);
if (!status.ok()) {
std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
//wrapping file_raw into a unique pointer to guarantee deletion
unique_ptr<FileHandle> file(file_raw);
std::unique_ptr<hdfs::FileHandle> file(file_raw);
char input_buffer[4096];
ssize_t read_bytes_count = 0;
size_t last_read_bytes = 0;
char input_buffer[BUF_SIZE];
ssize_t total_bytes_read = 0;
size_t last_bytes_read = 0;
do{
//Reading file chunks
Status stat = file->PositionRead(input_buffer, sizeof(input_buffer), read_bytes_count, &last_read_bytes);
if(stat.ok()) {
status = file->Read(input_buffer, sizeof(input_buffer), &last_bytes_read);
if(status.ok()) {
//Writing file chunks to stdout
fwrite(input_buffer, last_read_bytes, 1, stdout);
read_bytes_count += last_read_bytes;
fwrite(input_buffer, last_bytes_read, 1, stdout);
total_bytes_read += last_bytes_read;
} else {
if(stat.is_invalid_offset()){
if(status.is_invalid_offset()){
//Reached the end of the file
break;
} else {
cerr << "Error reading the file: " << stat.ToString() << endl;
return 1;
std::cerr << "Error reading the file: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
}
} while (last_read_bytes > 0);
} while (last_bytes_read > 0);
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();

View File

@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default LIBHDFSPP_DIR to the default install location. You can override
# it by add -DLIBHDFSPP_DIR=... to your cmake invocation
set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
include_directories( ${LIBHDFSPP_DIR}/include )
link_directories( ${LIBHDFSPP_DIR}/lib )
add_executable(find_cpp find.cpp)
target_link_libraries(find_cpp hdfspp)
# Several examples in different languages need to produce executables with
# same names. To allow executables with same names we keep their CMake
# names different, but specify their executable names as follows:
set_target_properties( find_cpp
PROPERTIES
OUTPUT_NAME "find"
)

View File

@ -0,0 +1,162 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
/**
* A parallel find tool example.
*
* Finds all files matching the specified name recursively starting from the
* specified directory and prints their filepaths. Works either synchronously
* or asynchronously.
*
* Usage: find /<path-to-file> <file-name> <use_async>
*
* Example: find /dir?/tree* some?file*name 1
*
* @param path-to-file Absolute path at which to begin search, can have wild
* cards and must be non-blank
* @param file-name Name to find, can have wild cards and must be non-blank
* @param use_async If set to 1 it prints out results asynchronously as
* they arrive. If set to 0 results are printed in one
* big chunk when it becomes available.
*
**/
#include "hdfspp/hdfspp.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include <google/protobuf/stubs/common.h>
#include <future>
void SyncFind(std::shared_ptr<hdfs::FileSystem> fs, const std::string &path, const std::string &name){
std::vector<hdfs::StatInfo> results;
//Synchronous call to Find
hdfs::Status stat = fs->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &results);
if (!stat.ok()) {
std::cerr << "Error: " << stat.ToString() << std::endl;
}
if(results.empty()){
std::cout << "Nothing Found" << std::endl;
} else {
//Printing out the results
for (hdfs::StatInfo const& si : results) {
std::cout << si.full_path << std::endl;
}
}
}
void AsyncFind(std::shared_ptr<hdfs::FileSystem> fs, const std::string &path, const std::string &name){
std::promise<void> promise;
std::future<void> future(promise.get_future());
bool something_found = false;
hdfs::Status status = hdfs::Status::OK();
/**
* Keep requesting more until we get the entire listing. Set the promise
* when we have the entire listing to stop.
*
* Find guarantees that the handler will only be called once at a time,
* so we do not need any locking here
*/
auto handler = [&promise, &status, &something_found]
(const hdfs::Status &s, const std::vector<hdfs::StatInfo> & si, bool has_more_results) -> bool {
//Print result chunks as they arrive
if(!si.empty()) {
something_found = true;
for (hdfs::StatInfo const& s : si) {
std::cout << s.full_path << std::endl;
}
}
if(!s.ok() && status.ok()){
//We make sure we set 'status' only on the first error.
status = s;
}
if (!has_more_results) {
promise.set_value(); //set promise
return false; //request stop sending results
}
return true; //request more results
};
//Asynchronous call to Find
fs->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), handler);
//block until promise is set
future.get();
if(!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
}
if(!something_found){
std::cout << "Nothing Found" << std::endl;
}
}
int main(int argc, char *argv[]) {
if (argc != 4) {
std::cerr << "usage: find /<path-to-file> <file-name> <use_async>" << std::endl;
exit(EXIT_FAILURE);
}
std::string path = argv[1];
std::string name = argv[2];
bool use_async = (std::stoi(argv[3]) != 0);
hdfs::Options options;
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
hdfs::ConfigurationLoader loader;
//Loading default config files core-site.xml and hdfs-site.xml from the config path
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
if(config){
//Loading options from the config
options = config->GetOptions();
}
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
options.rpc_timeout = std::numeric_limits<int>::max();
hdfs::IoService * io_service = hdfs::IoService::New();
//Wrapping fs into a unique pointer to guarantee deletion
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
if (!fs) {
std::cerr << "Could not connect the file system." << std::endl;
exit(EXIT_FAILURE);
}
hdfs::Status status = fs->ConnectToDefaultFs();
if (!status.ok()) {
if(!options.defaultFS.get_host().empty()){
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
} else {
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
}
exit(EXIT_FAILURE);
}
if (use_async){
//Example of Async find
AsyncFind(fs, path, name);
} else {
//Example of Sync find
SyncFind(fs, path, name);
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -23,9 +23,9 @@
* Generates a directory tree with specified depth and fanout starting from
* a given path. Generation is asynchronous.
*
* Usage: gendirs [hdfs://[<hostname>:<port>]]/<path-to-dir> <depth> <fanout>
* Usage: gendirs /<path-to-dir> <depth> <fanout>
*
* Example: gendirs hdfs://localhost.localdomain:9433/dir0 3 10
* Example: gendirs /dir0 3 10
*
* @param path-to-dir Absolute path to the directory tree root where the
* directory tree will be generated
@ -37,99 +37,76 @@
**/
#include "hdfspp/hdfspp.h"
#include "fs/namenode_operations.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include "common/uri.h"
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/stubs/common.h>
#include <future>
using namespace std;
using namespace hdfs;
#define DEFAULT_PERMISSIONS 0755
#define SCHEME "hdfs"
void GenerateDirectories (shared_ptr<FileSystem> fs, int depth, int level, int fanout, string path, vector<future<Status>> & futures) {
void GenerateDirectories (std::shared_ptr<hdfs::FileSystem> fs, int depth, int level, int fanout, std::string path, std::vector<std::future<hdfs::Status>> & futures) {
//Level contains our current depth in the directory tree
if(level < depth) {
for(int i = 0; i < fanout; i++){
//Recursive calls to cover all possible paths from the root to the leave nodes
GenerateDirectories(fs, depth, level+1, fanout, path + "dir" + to_string(i) + "/", futures);
GenerateDirectories(fs, depth, level+1, fanout, path + "dir" + std::to_string(i) + "/", futures);
}
} else {
//We have reached the leaf nodes and now start making calls to create directories
//We make a promise which will be set when the call finishes and executes our handler
auto callstate = make_shared<promise<Status>>();
auto callstate = std::make_shared<std::promise<hdfs::Status>>();
//Extract a future from this promise
future<Status> future(callstate->get_future());
std::future<hdfs::Status> future(callstate->get_future());
//Save this future to the vector of futures which will be used to wait on all promises
//after the whole recursion is done
futures.push_back(move(future));
futures.push_back(std::move(future));
//Create a handler that will be executed when Mkdirs is done
auto handler = [callstate](const Status &s) {
auto handler = [callstate](const hdfs::Status &s) {
callstate->set_value(s);
};
//Asynchronous call to create this directory along with all missing parent directories
fs->Mkdirs(path, NameNodeOperations::GetDefaultPermissionMask(), true, handler);
fs->Mkdirs(path, DEFAULT_PERMISSIONS, true, handler);
}
}
int main(int argc, char *argv[]) {
if (argc != 4) {
cerr << "usage: gendirs [hdfs://[<hostname>:<port>]]/<path-to-dir> <depth> <fanout>" << endl;
return 1;
std::cerr << "usage: gendirs /<path-to-dir> <depth> <fanout>" << std::endl;
exit(EXIT_FAILURE);
}
optional<URI> uri;
const string uri_path = argv[1];
const int depth = stoi(argv[2]);
const int fanout = stoi(argv[3]);
std::string path = argv[1];
int depth = std::stoi(argv[2]);
int fanout = std::stoi(argv[3]);
//Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind
size_t scheme_end = uri_path.find("://");
if (scheme_end != string::npos) {
if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) {
cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl;
return 1;
} else {
uri = URI::parse_from_string(uri_path);
}
}
if (!uri) {
cerr << "Malformed URI: " << uri_path << endl;
return 1;
}
ConfigurationLoader loader;
optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR");
if (envHadoopConfDir && (*envHadoopConfDir != 0) ) {
config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml");
}
Options options;
options.rpc_timeout = numeric_limits<int>::max();
hdfs::Options options;
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
hdfs::ConfigurationLoader loader;
//Loading default config files core-site.xml and hdfs-site.xml from the config path
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
if(config){
//Loading options from the config
options = config->GetOptions();
}
IoService * io_service = IoService::New();
FileSystem *fs_raw = FileSystem::New(io_service, "", options);
if (!fs_raw) {
cerr << "Could not create FileSystem object" << endl;
return 1;
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
options.rpc_timeout = std::numeric_limits<int>::max();
hdfs::IoService * io_service = hdfs::IoService::New();
//Wrapping fs into a unique pointer to guarantee deletion
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
if (!fs) {
std::cerr << "Could not connect the file system." << std::endl;
exit(EXIT_FAILURE);
}
//Wrapping fs_raw into a unique pointer to guarantee deletion
shared_ptr<FileSystem> fs(fs_raw);
//Get port from the uri, otherwise use the default port
string port = to_string(uri->get_port().value_or(8020));
Status stat = fs->Connect(uri->get_host(), port);
if (!stat.ok()) {
cerr << "Could not connect to " << uri->get_host() << ":" << port << endl;
return 1;
hdfs::Status status = fs->ConnectToDefaultFs();
if (!status.ok()) {
if(!options.defaultFS.get_host().empty()){
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
} else {
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
}
exit(EXIT_FAILURE);
}
/**
@ -143,22 +120,23 @@ int main(int argc, char *argv[]) {
* processed. After the whole recursion is complete we will need to wait until
* all promises are set before we can exit.
**/
vector<future<Status>> futures;
std::vector<std::future<hdfs::Status>> futures;
GenerateDirectories(fs, depth, 0, fanout, uri->get_path() + "/", futures);
GenerateDirectories(fs, depth, 0, fanout, path + "/", futures);
/**
* We are waiting here until all promises are set, and checking whether
* the returned statuses contained any errors.
**/
for(future<Status> &fs : futures){
Status stat = fs.get();
if (!stat.ok()) {
cerr << "Error: " << stat.ToString() << endl;
for(std::future<hdfs::Status> &fs : futures){
hdfs::Status status = fs.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
}
cout << "All done!" << endl;
std::cout << "All done!" << std::endl;
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();

View File

@ -287,6 +287,20 @@ LIBHDFS_EXTERNAL
int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie);
/**
* Finds file name on the file system. hdfsFreeFileInfo should be called to deallocate memory.
*
* @param fs The filesystem (required)
* @param path Path at which to begin search, can have wild cards (must be non-blank)
* @param name Name to find, can have wild cards (must be non-blank)
* @param numEntries Set to the number of files/directories in the result.
* @return Returns a dynamically-allocated array of hdfsFileInfo
* objects; NULL on error or empty result.
* errno is set to non-zero on error or zero on success.
**/
hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries);
/*****************************************************************************
* HDFS SNAPSHOT FUNCTIONS
****************************************************************************/

View File

@ -138,6 +138,18 @@ public:
**/
class FileSystem {
public:
//Returns the default maximum depth for recursive Find tool
static uint32_t GetDefaultFindMaxDepth();
//Returns the default permission mask
static uint16_t GetDefaultPermissionMask();
//Checks if the given permission mask is valid
static Status CheckValidPermissionMask(uint16_t permissions);
//Checks if replication value is valid
static Status CheckValidReplication(uint16_t replication);
/**
* Create a new instance of the FileSystem object. The call
* initializes the RPC connections to the NameNode and returns an
@ -236,7 +248,7 @@ class FileSystem {
*
* The asynchronous method will return batches of files; the consumer must
* return true if they want more files to be delivered. The final bool
* parameter in the callback will be set to true if this is the final
* parameter in the callback will be set to false if this is the final
* batch of files.
*
* The synchronous method will return all files in the directory.
@ -245,9 +257,8 @@ class FileSystem {
**/
virtual void
GetListing(const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) = 0;
virtual Status GetListing(const std::string &path,
std::shared_ptr<std::vector<StatInfo>> & stat_infos) = 0;
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) = 0;
virtual Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) = 0;
/**
* Returns the locations of all known blocks for the indicated file (or part of it), or an error
@ -297,8 +308,8 @@ class FileSystem {
* @param path the path to the file or directory
* @param permissions the bitmask to set it to (should be between 0 and 01777)
*/
virtual void SetPermission(const std::string & path,
uint16_t permissions, const std::function<void(const Status &)> &handler) = 0;
virtual void SetPermission(const std::string & path, uint16_t permissions,
const std::function<void(const Status &)> &handler) = 0;
virtual Status SetPermission(const std::string & path, uint16_t permissions) = 0;
/**
@ -307,12 +318,34 @@ class FileSystem {
* @param path file path
* @param username If it is empty, the original username remains unchanged.
* @param groupname If it is empty, the original groupname remains unchanged.
* @param recursive If true, the change will be propagated recursively.
*/
virtual void SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) = 0;
virtual Status SetOwner(const std::string & path,
const std::string & username, const std::string & groupname) = 0;
/**
* Finds all files matching the specified name recursively starting from the
* specified directory. Returns metadata for each of them.
*
* Example: Find("/dir?/tree*", "some?file*name")
*
* @param path Absolute path at which to begin search, can have wild cards (must be non-blank)
* @param name Name to find, can also have wild cards (must be non-blank)
*
* The asynchronous method will return batches of files; the consumer must
* return true if they want more files to be delivered. The final bool
* parameter in the callback will be set to false if this is the final
* batch of files.
*
* The synchronous method will return matching files.
**/
virtual void
Find(const std::string &path, const std::string &name, const uint32_t maxdepth,
const std::function<bool(const Status &, const std::vector<StatInfo> & , bool)> &handler) = 0;
virtual Status Find(const std::string &path, const std::string &name,
const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) = 0;
/*****************************************************************************

View File

@ -33,6 +33,7 @@ struct StatInfo {
int file_type;
::std::string path;
::std::string full_path;
unsigned long int length;
unsigned long int permissions; //Octal number as in POSIX permissions; e.g. 0777
::std::string owner;

View File

@ -731,22 +731,21 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
if(!abs_path) {
return nullptr;
}
std::shared_ptr<std::vector<StatInfo>> stat_infos;
Status stat = fs->get_impl()->GetListing(*abs_path, stat_infos);
std::vector<StatInfo> stat_infos;
Status stat = fs->get_impl()->GetListing(*abs_path, &stat_infos);
if (!stat.ok()) {
Error(stat);
*numEntries = 0;
return nullptr;
}
//Existing API expects nullptr if size is 0
if(!stat_infos || stat_infos->size()==0){
if(stat_infos.empty()){
*numEntries = 0;
return nullptr;
}
*numEntries = stat_infos->size();
hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos->size()];
for(std::vector<StatInfo>::size_type i = 0; i < stat_infos->size(); i++) {
StatInfoToHdfsFileInfo(&file_infos[i], stat_infos->at(i));
*numEntries = stat_infos.size();
hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()];
for(std::vector<StatInfo>::size_type i = 0; i < stat_infos.size(); i++) {
StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i));
}
return file_infos;
@ -785,7 +784,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) {
}
Status stat;
//Use default permissions and set true for creating all non-existant parent directories
stat = fs->get_impl()->Mkdirs(*abs_path, NameNodeOperations::GetDefaultPermissionMask(), true);
stat = fs->get_impl()->Mkdirs(*abs_path, FileSystem::GetDefaultPermissionMask(), true);
if (!stat.ok()) {
return Error(stat);
}
@ -854,7 +853,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){
if(!abs_path) {
return -1;
}
Status stat = NameNodeOperations::CheckValidPermissionMask(mode);
Status stat = FileSystem::CheckValidPermissionMask(mode);
if (!stat.ok()) {
return Error(stat);
}
@ -896,6 +895,44 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
}
}
hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){
try {
errno = 0;
if (!CheckSystem(fs)) {
*numEntries = 0;
return nullptr;
}
std::vector<StatInfo> stat_infos;
Status stat = fs->get_impl()->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &stat_infos);
if (!stat.ok()) {
Error(stat);
*numEntries = 0;
return nullptr;
}
//Existing API expects nullptr if size is 0
if(stat_infos.empty()){
*numEntries = 0;
return nullptr;
}
*numEntries = stat_infos.size();
hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()];
for(std::vector<StatInfo>::size_type i = 0; i < stat_infos.size(); i++) {
StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i));
}
return file_infos;
} catch (const std::exception & e) {
ReportException(e);
*numEntries = 0;
return nullptr;
} catch (...) {
ReportCaughtNonException();
*numEntries = 0;
return nullptr;
}
}
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
try {
errno = 0;
@ -1373,19 +1410,18 @@ HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
}
else
{
return loader.New<HdfsConfiguration>();
return loader.NewConfig<HdfsConfiguration>();
}
}
hdfsBuilder::hdfsBuilder() : config(loader.New<HdfsConfiguration>())
hdfsBuilder::hdfsBuilder() : config(loader.NewConfig<HdfsConfiguration>())
{
errno = 0;
loader.SetDefaultSearchPath();
config = LoadDefault(loader);
}
hdfsBuilder::hdfsBuilder(const char * directory) :
config(loader.New<HdfsConfiguration>())
config(loader.NewConfig<HdfsConfiguration>())
{
errno = 0;
loader.SetSearchPath(directory);

View File

@ -62,6 +62,12 @@ bool str_to_bool(const std::string& raw) {
return false;
}
ConfigurationLoader::ConfigurationLoader() {
//In order to creates a configuration loader with the default search path
//("$HADOOP_CONF_DIR" or "/etc/hadoop/conf") we call SetDefaultSearchPath().
ConfigurationLoader::SetDefaultSearchPath();
}
void ConfigurationLoader::SetDefaultSearchPath() {
// Try (in order, taking the first valid one):
// $HADOOP_CONF_DIR
@ -257,4 +263,4 @@ bool ConfigurationLoader::UpdateMapWithValue(ConfigMap& map,
return true;
}
}
}

View File

@ -27,9 +27,9 @@ namespace hdfs {
class ConfigurationLoader {
public:
// Creates a new, empty Configuration object
// T must be Configuration or a subclass
// T must be Configuration or a subclass
template<class T>
T New();
T NewConfig();
/****************************************************************************
* LOADING CONFIG FILES
@ -79,6 +79,10 @@ public:
* SEARCH PATH METHODS
***************************************************************************/
//Creates a configuration loader with the default search path ("$HADOOP_CONF_DIR" or "/etc/hadoop/conf").
//If you want to explicitly set the entire search path, call ClearSearchPath() first
ConfigurationLoader();
// Sets the search path to the default search path (namely, "$HADOOP_CONF_DIR" or "/etc/hadoop/conf")
void SetDefaultSearchPath();

View File

@ -23,7 +23,7 @@ namespace hdfs {
template<class T>
T ConfigurationLoader::New() {
T ConfigurationLoader::NewConfig() {
return T();
}

View File

@ -26,6 +26,7 @@
#include <tuple>
#include <iostream>
#include <pwd.h>
#include <fnmatch.h>
#define FMT_THIS_ADDR "this=" << (void*)this
@ -39,6 +40,34 @@ using ::asio::ip::tcp;
static constexpr uint16_t kDefaultPort = 8020;
uint32_t FileSystem::GetDefaultFindMaxDepth() {
return std::numeric_limits<uint32_t>::max();
}
uint16_t FileSystem::GetDefaultPermissionMask() {
return 0755;
}
Status FileSystem::CheckValidPermissionMask(uint16_t permissions) {
if (permissions > 01777) {
std::stringstream errormsg;
errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct
<< std::showbase << permissions << " (should be between 0 and 01777)";
return Status::InvalidArgument(errormsg.str().c_str());
}
return Status::OK();
}
Status FileSystem::CheckValidReplication(uint16_t replication) {
if (replication < 1 || replication > 512) {
std::stringstream errormsg;
errormsg << "CheckValidReplication: argument 'replication' is "
<< replication << " (should be between 1 and 512)";
return Status::InvalidArgument(errormsg.str().c_str());
}
return Status::OK();
}
/*****************************************************************************
* FILESYSTEM BASE CLASS
****************************************************************************/
@ -446,7 +475,7 @@ void FileSystemImpl::SetReplication(const std::string & path, int16_t replicatio
handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
return;
}
Status replStatus = NameNodeOperations::CheckValidReplication(replication);
Status replStatus = FileSystem::CheckValidReplication(replication);
if (!replStatus.ok()) {
handler(replStatus);
return;
@ -593,44 +622,43 @@ Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
* Some compilers don't like recursive lambdas, so we make the lambda call a
* method, which in turn creates a lambda calling itself.
*/
void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
std::string path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
bool has_next = stat_infos && stat_infos->size() > 0;
void FileSystemImpl::GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
bool has_next = !stat_infos.empty();
bool get_more = handler(stat, stat_infos, has_more && has_next);
if (get_more && has_more && has_next ) {
auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
GetListingShim(stat, stat_infos, has_more, path, handler);
};
std::string last = stat_infos->back().path;
std::string last = stat_infos.back().path;
nn_.GetListing(path, callback, last);
}
}
void FileSystemImpl::GetListing(
const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
// Caputure the state and push it into the shim
auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
GetListingShim(stat, stat_infos, has_more, path, handler);
};
nn_.GetListing(path, callback);
}
Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) {
Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
// In this case, we're going to allocate the result on the heap and have the
// async code populate it.
auto results = std::make_shared<std::vector<StatInfo>>();
if (!stat_infos) {
return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL");
}
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
@ -640,9 +668,9 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::
Keep requesting more until we get the entire listing, and don't set the promise
* until we have the entire listing.
*/
auto h = [callstate, results](const Status &s, std::shared_ptr<std::vector<StatInfo>> si, bool has_more) -> bool {
if (si) {
results->insert(results->end(), si->begin(), si->end());
auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool {
if (!si.empty()) {
stat_infos->insert(stat_infos->end(), si.begin(), si.end());
}
bool done = !s.ok() || !has_more;
@ -658,11 +686,6 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::
/* block until promise is set */
Status stat = future.get();
if (!stat.ok()) {
return stat;
}
stat_infos = results;
return stat;
}
@ -677,7 +700,7 @@ void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool
return;
}
Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions);
Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
@ -790,7 +813,7 @@ void FileSystemImpl::SetPermission(const std::string & path,
handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
return;
}
Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions);
Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
@ -832,8 +855,8 @@ void FileSystemImpl::SetOwner(const std::string & path, const std::string & user
nn_.SetOwner(path, username, groupname, handler);
}
Status FileSystemImpl::SetOwner(const std::string & path,
const std::string & username, const std::string & groupname) {
Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
const std::string & groupname) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
@ -849,10 +872,179 @@ Status FileSystemImpl::SetOwner(const std::string & path,
/* block until promise is set */
Status stat = future.get();
return stat;
}
/**
* Helper function for recursive Find calls.
*
* Some compilers don't like recursive lambdas, so we make the lambda call a
* method, which in turn creates a lambda calling itself.
*
* ***High-level explanation***
*
* Since we are allowing to use wild cards in both path and name, we start by expanding the path first.
* Boolean search_path is set to true when we search for the path and false when we search for the name.
* When we search for the path we break the given path pattern into sub-directories. Starting from the
* first sub-directory we list them one-by-one and recursively continue into directories that matched the
* path pattern at the current depth. Directories that are large will be requested to continue sending
* the results. We keep track of the current depth within the path pattern in the 'depth' variable.
* This continues recursively until the depth reaches the end of the path. Next that we start matching
* the name pattern. All directories that we find we recurse now, and all names that match the given name
* pattern are being stored in outputs and later sent back to the user.
*/
void FileSystemImpl::FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more,
std::shared_ptr<FindOperationalState> operational_state, std::shared_ptr<FindSharedState> shared_state) {
//We buffer the outputs then send them back at the end
std::vector<StatInfo> outputs;
//Return on error
if(!stat.ok()){
std::lock_guard<std::mutex> find_lock(shared_state->lock);
//We send true becuase we do not want the user code to exit before all our requests finished
shared_state->handler(stat, outputs, true);
shared_state->aborted = true;
}
if(!shared_state->aborted){
//User did not abort the operation
if (directory_has_more) {
//Directory is large and has more results
//We launch another async call to get more results
shared_state->outstanding_requests++;
auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
FindShim(stat, stat_infos, has_more, operational_state, shared_state);
};
std::string last = stat_infos.back().path;
nn_.GetListing(operational_state->path, callback, last);
}
if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){
//We are searching for the path and did not reach the end of the path yet
for (StatInfo const& si : stat_infos) {
//If we are at the last depth and it matches both path and name, we need to output it.
if (operational_state->depth == shared_state->dirs.size() - 2
&& !fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0)
&& !fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)) {
outputs.push_back(si);
}
//Skip if not directory
if(si.file_type != StatInfo::IS_DIR) {
continue;
}
//Checking for a match with the path at the current depth
if(!fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0)){
//Launch a new requests for every matched directory
shared_state->outstanding_requests++;
auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, true); //true because searching for the path
FindShim(stat, stat_infos, has_more, new_current_state, shared_state);
};
nn_.GetListing(si.full_path, callback);
}
}
}
else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){
//We are searching for the name now and maxdepth has not been reached
for (StatInfo const& si : stat_infos) {
//Launch a new request for every directory
if(si.file_type == StatInfo::IS_DIR) {
shared_state->outstanding_requests++;
auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, false); //false because searching for the name
FindShim(stat, stat_infos, has_more, new_current_state, shared_state);
};
nn_.GetListing(si.full_path, callback);
}
//All names that match the specified name are saved to outputs
if(!fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)){
outputs.push_back(si);
}
}
}
}
//This section needs a lock to make sure we return the final chunk only once
//and no results are sent after aborted is set
std::lock_guard<std::mutex> find_lock(shared_state->lock);
//Decrement the counter once since we are done with this chunk
shared_state->outstanding_requests--;
if(shared_state->outstanding_requests == 0){
//Send the outputs back to the user and notify that this is the final chunk
shared_state->handler(stat, outputs, false);
} else {
//There will be more results and we are not aborting
if (outputs.size() > 0 && !shared_state->aborted){
//Send the outputs back to the user and notify that there is more
bool user_wants_more = shared_state->handler(stat, outputs, true);
if(!user_wants_more) {
//Abort if user doesn't want more
shared_state->aborted = true;
}
}
}
}
void FileSystemImpl::Find(
const std::string &path, const std::string &name, const uint32_t maxdepth,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
LOG_INFO(kFileSystem, << "FileSystemImpl::Find("
<< FMT_THIS_ADDR << ", path="
<< path << ", name="
<< name << ") called");
//Populating the operational state, which includes:
//current search path, depth within the path, and the indication that we are currently searching for a path (not name yet).
std::shared_ptr<FindOperationalState> operational_state = std::make_shared<FindOperationalState>(path, 0, true);
//Populating the shared state, which includes:
//vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag.
std::shared_ptr<FindSharedState> shared_state = std::make_shared<FindSharedState>(path, name, maxdepth, handler, 1, false);
auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more) {
FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state);
};
nn_.GetListing("/", callback);
}
Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Find("
<< FMT_THIS_ADDR << ", path="
<< path << ", name="
<< name << ") called");
if (!stat_infos) {
return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL");
}
// In this case, we're going to have the async code populate stat_infos.
std::promise<void> promise = std::promise<void>();
std::future<void> future(promise.get_future());
Status status = Status::OK();
/**
* Keep requesting more until we get the entire listing. Set the promise
* when we have the entire listing to stop.
*
* Find guarantees that the handler will only be called once at a time,
* so we do not need any locking here
*/
auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool {
if (!si.empty()) {
stat_infos->insert(stat_infos->end(), si.begin(), si.end());
}
if (!s.ok() && status.ok()){
//We make sure we set 'status' only on the first error.
status = s;
}
if (!has_more_results) {
promise.set_value();
return false;
}
return true;
};
Find(path, name, maxdepth, h);
/* block until promise is set */
future.get();
return status;
}
void FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name,

View File

@ -94,9 +94,9 @@ public:
void GetListing(
const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) override;
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override;
Status GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) override;
Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) override;
virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
@ -115,8 +115,8 @@ public:
const std::function<void(const Status &)> &handler) override;
virtual Status Rename(const std::string &oldPath, const std::string &newPath) override;
virtual void SetPermission(const std::string & path,
uint16_t permissions, const std::function<void(const Status &)> &handler) override;
virtual void SetPermission(const std::string & path, uint16_t permissions,
const std::function<void(const Status &)> &handler) override;
virtual Status SetPermission(const std::string & path, uint16_t permissions) override;
virtual void SetOwner(const std::string & path, const std::string & username,
@ -124,6 +124,11 @@ public:
virtual Status SetOwner(const std::string & path,
const std::string & username, const std::string & groupname) override;
void Find(
const std::string &path, const std::string &name, const uint32_t maxdepth,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override;
Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) override;
/*****************************************************************************
* FILE SYSTEM SNAPSHOT FUNCTIONS
****************************************************************************/
@ -204,9 +209,58 @@ private:
**/
std::shared_ptr<LibhdfsEvents> event_handlers_;
void GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
std::string path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler);
void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler);
struct FindSharedState {
//Name pattern (can have wild-cards) to find
const std::string name;
//Maximum depth to recurse after the end of path is reached.
//Can be set to 0 for pure path globbing and ignoring name pattern entirely.
const uint32_t maxdepth;
//Vector of all sub-directories from the path argument (each can have wild-cards)
std::vector<std::string> dirs;
//Callback from Find
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
//outstanding_requests is incremented once for every GetListing call.
std::atomic<uint64_t> outstanding_requests;
//Boolean needed to abort all recursion on error or on user command
std::atomic<bool> aborted;
//Shared variables will need protection with a lock
std::mutex lock;
FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
uint64_t outstanding_recuests_, bool aborted_)
: name(name_),
maxdepth(maxdepth_),
handler(handler_),
outstanding_requests(outstanding_recuests_),
aborted(aborted_),
lock() {
//Constructing the list of sub-directories
std::stringstream ss(path_);
if(path_.back() != '/'){
ss << "/";
}
for (std::string token; std::getline(ss, token, '/'); ) {
dirs.push_back(token);
}
}
};
struct FindOperationalState {
const std::string path;
const uint32_t depth;
const bool search_path;
FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
: path(path_),
depth(depth_),
search_path(search_path_) {
}
};
void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos,
bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state);
};
}

View File

@ -39,30 +39,6 @@ namespace hdfs {
* NAMENODE OPERATIONS
****************************************************************************/
uint16_t NameNodeOperations::GetDefaultPermissionMask() {
return 0755;
}
Status NameNodeOperations::CheckValidPermissionMask(uint16_t permissions) {
if (permissions > 01777) {
std::stringstream errormsg;
errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct
<< std::showbase << permissions << " (should be between 0 and 01777)";
return Status::InvalidArgument(errormsg.str().c_str());
}
return Status::OK();
}
Status NameNodeOperations::CheckValidReplication(uint16_t replication) {
if (replication < 1 || replication > 512) {
std::stringstream errormsg;
errormsg << "CheckValidReplication: argument 'replication' is "
<< replication << " (should be between 1 and 512)";
return Status::InvalidArgument(errormsg.str().c_str());
}
return Status::OK();
}
void NameNodeOperations::Connect(const std::string &cluster_name,
const std::vector<ResolvedNamenodeInfo> &servers,
std::function<void(const Status &)> &&handler) {
@ -170,7 +146,7 @@ void NameNodeOperations::SetReplication(const std::string & path, int16_t replic
handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
return;
}
Status replStatus = CheckValidReplication(replication);
Status replStatus = FileSystemImpl::CheckValidReplication(replication);
if (!replStatus.ok()) {
handler(replStatus);
return;
@ -252,7 +228,8 @@ void NameNodeOperations::GetFileInfo(const std::string & path,
// no fs in the protobuf.
if(resp -> has_fs()){
struct StatInfo stat_info;
stat_info.path=path;
stat_info.path = path;
stat_info.full_path = path;
HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
handler(stat, stat_info);
} else {
@ -290,7 +267,7 @@ void NameNodeOperations::GetFsStats(
void NameNodeOperations::GetListing(
const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler,
const std::string & start_after) {
using ::hadoop::hdfs::GetListingRequestProto;
using ::hadoop::hdfs::GetListingResponseProto;
@ -300,8 +277,8 @@ void NameNodeOperations::GetListing(
<< "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), stat_infos, false);
std::vector<StatInfo> empty;
handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), empty, false);
return;
}
@ -312,31 +289,26 @@ void NameNodeOperations::GetListing(
auto resp = std::make_shared<GetListingResponseProto>();
namenode_.GetListing(
&req,
resp,
[resp, handler, path](const Status &stat) {
if (stat.ok()) {
if(resp -> has_dirlist()){
std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
StatInfo si;
si.path=fs.path();
HdfsFileStatusProtoToStatInfo(si, fs);
stat_infos->push_back(si);
}
handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(statNew, stat_infos, false);
}
} else {
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(stat, stat_infos, false);
namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) {
std::vector<StatInfo> stat_infos;
if (stat.ok()) {
if(resp -> has_dirlist()){
for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
StatInfo si;
si.path = fs.path();
si.full_path = path + fs.path() + "/";
HdfsFileStatusProtoToStatInfo(si, fs);
stat_infos.push_back(si);
}
});
handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
} else {
std::string errormsg = "No such file or directory: " + path;
handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false);
}
} else {
handler(stat, stat_infos, false);
}
});
}
void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
@ -355,7 +327,7 @@ void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions,
}
MkdirsRequestProto req;
Status permStatus = CheckValidPermissionMask(permissions);
Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
@ -471,7 +443,7 @@ void NameNodeOperations::SetPermission(const std::string & path,
handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
return;
}
Status permStatus = CheckValidPermissionMask(permissions);
Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;

View File

@ -48,12 +48,6 @@ public:
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
namenode_(& engine_), options_(options) {}
static uint16_t GetDefaultPermissionMask();
static Status CheckValidPermissionMask(uint16_t permissions);
static Status CheckValidReplication(uint16_t replication);
void Connect(const std::string &cluster_name,
const std::vector<ResolvedNamenodeInfo> &servers,
std::function<void(const Status &)> &&handler);
@ -77,7 +71,7 @@ public:
// start_after="" for initial call
void GetListing(const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler,
const std::string & start_after = "");
void Mkdirs(const std::string & path, uint16_t permissions, bool createparent,

View File

@ -33,21 +33,27 @@ TEST(ConfigurationTest, TestDegenerateInputs) {
/* Completely empty stream */
{
std::stringstream stream;
optional<Configuration> config = ConfigurationLoader().Load<Configuration>("");
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>("");
EXPECT_FALSE(config && "Empty stream");
}
/* No values */
{
std::string data = "<configuration></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(data);
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(data);
EXPECT_TRUE(config && "Blank config");
}
/* Extraneous values */
{
std::string data = "<configuration><spam></spam></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(data);
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(data);
EXPECT_TRUE(config && "Extraneous values");
}
}
@ -57,7 +63,9 @@ TEST(ConfigurationTest, TestBasicOperations) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "value1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
}
@ -74,7 +82,9 @@ TEST(ConfigurationTest, TestBasicOperations) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "value1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
EXPECT_EQ("value1", config->GetWithDefault("KEY1", ""));
}
@ -83,7 +93,9 @@ TEST(ConfigurationTest, TestBasicOperations) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "value1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
optional<std::string> value = config->Get("key1");
EXPECT_TRUE((bool)value);
@ -97,7 +109,9 @@ TEST(ConfigurationTest, TestCompactValues) {
std::stringstream stream;
stream << "<configuration><property name=\"key1\" "
"value=\"value1\"/></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Compact value parse");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
}
@ -108,7 +122,9 @@ TEST(ConfigurationTest, TestMultipleResources) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "value1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -129,7 +145,9 @@ TEST(ConfigurationTest, TestStringResource) {
simpleConfigStream(stream, "key1", "value1");
std::string str = stream.str();
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
}
@ -171,7 +189,9 @@ TEST(ConfigurationTest, TestFinal) {
std::stringstream stream;
stream << "<configuration><property><name>key1</name><value>value1</"
"value><final>false</final></property></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -187,7 +207,9 @@ TEST(ConfigurationTest, TestFinal) {
std::stringstream stream;
stream << "<configuration><property><name>key1</name><value>value1</"
"value><final>true</final></property></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -203,7 +225,9 @@ TEST(ConfigurationTest, TestFinal) {
std::stringstream stream;
stream << "<configuration><property name=\"key1\" value=\"value1\" "
"final=\"false\"/></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -219,7 +243,9 @@ TEST(ConfigurationTest, TestFinal) {
std::stringstream stream;
stream << "<configuration><property name=\"key1\" value=\"value1\" "
"final=\"true\"/></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -235,7 +261,9 @@ TEST(ConfigurationTest, TestFinal) {
std::stringstream stream;
stream << "<configuration><property><name>key1</name><value>value1</"
"value><final>spam</final></property></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -251,7 +279,9 @@ TEST(ConfigurationTest, TestFinal) {
std::stringstream stream;
stream << "<configuration><property><name>key1</name><value>value1</"
"value><final></final></property></configuration>";
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
@ -271,7 +301,9 @@ TEST(ConfigurationTest, TestFileReads)
TempFile tempFile;
writeSimpleConfig(tempFile.filename, "key1", "value1");
optional<Configuration> config = ConfigurationLoader().LoadFromFile<Configuration>(tempFile.filename);
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.LoadFromFile<Configuration>(tempFile.filename);
EXPECT_TRUE(config && "Parse first stream");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
}
@ -298,7 +330,9 @@ TEST(ConfigurationTest, TestFileReads)
{
TempDir tempDir;
optional<Configuration> config = ConfigurationLoader().LoadFromFile<Configuration>(tempDir.path);
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.LoadFromFile<Configuration>(tempDir.path);
EXPECT_FALSE(config && "Add directory as file resource");
}
@ -359,7 +393,9 @@ TEST(ConfigurationTest, TestIntConversions) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
optional<int64_t> value = config->GetInt("key1");
EXPECT_TRUE((bool)value);
@ -398,7 +434,9 @@ TEST(ConfigurationTest, TestDoubleConversions) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
optional<double> value = config->GetDouble("key1");
EXPECT_TRUE((bool)value);
@ -441,7 +479,9 @@ TEST(ConfigurationTest, TestBoolConversions) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "true");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
optional<bool> value = config->GetBool("key1");
EXPECT_TRUE((bool)value);
@ -488,7 +528,9 @@ TEST(ConfigurationTest, TestUriConversions) {
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "hdfs:///");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> config = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
optional<URI> value = config->GetUri("key1");
EXPECT_TRUE((bool)value);

View File

@ -54,7 +54,9 @@ template <typename... Args>
optional<Configuration> simpleConfig(Args... args) {
std::stringstream stream;
simpleConfigStream(stream, args...);
optional<Configuration> parse = ConfigurationLoader().Load<Configuration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<Configuration> parse = config_loader.Load<Configuration>(stream.str());
EXPECT_TRUE((bool)parse);
return parse;

View File

@ -30,7 +30,9 @@ TEST(HdfsConfigurationTest, TestDefaultOptions)
{
// Completely empty stream
{
HdfsConfiguration empty_config = ConfigurationLoader().New<HdfsConfiguration>();
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
HdfsConfiguration empty_config = config_loader.NewConfig<HdfsConfiguration>();
Options options = empty_config.GetOptions();
EXPECT_EQ(Options::kDefaultRpcTimeout, options.rpc_timeout);
}
@ -49,8 +51,9 @@ TEST(HdfsConfigurationTest, TestSetOptions)
HdfsConfiguration::kIpcClientConnectTimeoutKey, 103,
HdfsConfiguration::kHadoopSecurityAuthenticationKey, HdfsConfiguration::kHadoopSecurityAuthentication_kerberos
);
optional<HdfsConfiguration> config = ConfigurationLoader().Load<HdfsConfiguration>(stream.str());
ConfigurationLoader config_loader;
config_loader.ClearSearchPath();
optional<HdfsConfiguration> config = config_loader.Load<HdfsConfiguration>(stream.str());
EXPECT_TRUE(config && "Read stream");
Options options = config->GetOptions();

View File

@ -492,6 +492,10 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations) {
return libhdfspp_hdfsFreeBlockLocations(locations);
}
hdfsFileInfo *hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t *numEntries) {
return (hdfsFileInfo *)libhdfspp_hdfsFind(fs->libhdfsppRep, path, name, numEntries);
}
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
return libhdfspp_hdfsCreateSnapshot(fs->libhdfsppRep, path, name);
}

View File

@ -94,6 +94,7 @@
#undef hdfsCancel
#undef hdfsGetBlockLocations
#undef hdfsFreeBlockLocations
#undef hdfsFind
#undef hdfsCreateSnapshot
#undef hdfsDeleteSnapshot
#undef hdfsAllowSnapshot

View File

@ -94,6 +94,7 @@
#define hdfsCancel libhdfspp_hdfsCancel
#define hdfsGetBlockLocations libhdfspp_hdfsGetBlockLocations
#define hdfsFreeBlockLocations libhdfspp_hdfsFreeBlockLocations
#define hdfsFind libhdfspp_hdfsFind
#define hdfsCreateSnapshot libhdfspp_hdfsCreateSnapshot
#define hdfsDeleteSnapshot libhdfspp_hdfsDeleteSnapshot
#define hdfsAllowSnapshot libhdfspp_hdfsAllowSnapshot

View File

@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default LIBHDFSPP_DIR to the default install location. You can override
# it by add -DLIBHDFSPP_DIR=... to your cmake invocation
set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
include_directories( ${LIBHDFSPP_DIR}/include )
link_directories( ${LIBHDFSPP_DIR}/lib )
add_library(tools_common_obj OBJECT tools_common.cpp)
add_library(tools_common $<TARGET_OBJECTS:tools_common_obj>)
add_executable(hdfs_cat hdfs_cat.cpp)
target_link_libraries(hdfs_cat tools_common hdfspp)
add_executable(hdfs_chgrp hdfs_chgrp.cpp)
target_link_libraries(hdfs_chgrp tools_common hdfspp)
add_executable(hdfs_chown hdfs_chown.cpp)
target_link_libraries(hdfs_chown tools_common hdfspp)
add_executable(hdfs_chmod hdfs_chmod.cpp)
target_link_libraries(hdfs_chmod tools_common hdfspp)
add_executable(hdfs_find hdfs_find.cpp)
target_link_libraries(hdfs_find tools_common hdfspp)

View File

@ -0,0 +1,120 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_cat [OPTION] FILE"
<< std::endl
<< std::endl << "Concatenate FILE to standard output."
<< std::endl
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_cat hdfs://localhost.localdomain:9433/dir/file"
<< std::endl << "hdfs_cat /dir/file"
<< std::endl;
}
#define BUF_SIZE 4096
int main(int argc, char *argv[]) {
if (argc != 2) {
usage();
exit(EXIT_FAILURE);
}
int input;
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "h")) != -1) {
switch (input)
{
case 'h':
usage();
exit(EXIT_SUCCESS);
break;
case '?':
if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string uri_path = argv[optind];
//Building a URI object from the given uri_path
hdfs::optional<hdfs::URI> uri = hdfs::URI::parse_from_string(uri_path);
if (!uri) {
std::cerr << "Malformed URI: " << uri_path << std::endl;
exit(EXIT_FAILURE);
}
//TODO: HDFS-9539 Currently options can be returned empty
hdfs::Options options = *hdfs::getOptions();
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri.value(), options);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
hdfs::FileHandle *file_raw = nullptr;
hdfs::Status status = fs->Open(uri->get_path(), &file_raw);
if (!status.ok()) {
std::cerr << "Could not open file " << uri->get_path() << ". " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
//wrapping file_raw into a unique pointer to guarantee deletion
std::unique_ptr<hdfs::FileHandle> file(file_raw);
char input_buffer[BUF_SIZE];
ssize_t total_bytes_read = 0;
size_t last_bytes_read = 0;
do{
//Reading file chunks
status = file->Read(input_buffer, sizeof(input_buffer), &last_bytes_read);
if(status.ok()) {
//Writing file chunks to stdout
fwrite(input_buffer, last_bytes_read, 1, stdout);
total_bytes_read += last_bytes_read;
} else {
if(status.is_invalid_offset()){
//Reached the end of the file
break;
} else {
std::cerr << "Error reading the file: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
}
} while (last_bytes_read > 0);
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,196 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include <future>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_chgrp [OPTION] GROUP FILE"
<< std::endl
<< std::endl << "Change the group association of each FILE to GROUP."
<< std::endl << "The user must be the owner of files. Additional information is in the Permissions Guide:"
<< std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html"
<< std::endl
<< std::endl << " -R operate on files and directories recursively"
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_chgrp -R new_group hdfs://localhost.localdomain:9433/dir/file"
<< std::endl << "hdfs_chgrp new_group /dir/file"
<< std::endl;
}
struct SetOwnerState {
const std::string username;
const std::string groupname;
const std::function<void(const hdfs::Status &)> handler;
//The request counter is incremented once every time SetOwner async call is made
uint64_t request_counter;
//This boolean will be set when find returns the last result
bool find_is_done;
//Final status to be returned
hdfs::Status status;
//Shared variables will need protection with a lock
std::mutex lock;
SetOwnerState(const std::string & username_, const std::string & groupname_,
const std::function<void(const hdfs::Status &)> & handler_,
uint64_t request_counter_, bool find_is_done_)
: username(username_),
groupname(groupname_),
handler(handler_),
request_counter(request_counter_),
find_is_done(find_is_done_),
status(),
lock() {
}
};
int main(int argc, char *argv[]) {
//We should have 3 or 4 parameters
if (argc != 3 && argc != 4) {
usage();
exit(EXIT_FAILURE);
}
bool recursive = false;
int input;
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "Rh")) != -1) {
switch (input)
{
case 'R':
recursive = 1;
break;
case 'h':
usage();
exit(EXIT_SUCCESS);
break;
case '?':
if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string group = argv[optind];
//Owner stays the same, just group association changes.
std::string owner = "";
std::string uri_path = argv[optind + 1];
//Building a URI object from the given uri_path
hdfs::optional<hdfs::URI> uri = hdfs::URI::parse_from_string(uri_path);
if (!uri) {
std::cerr << "Malformed URI: " << uri_path << std::endl;
exit(EXIT_FAILURE);
}
//TODO: HDFS-9539 Currently options can be returned empty
hdfs::Options options = *hdfs::getOptions();
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
options.rpc_timeout = std::numeric_limits<int>::max();
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri.value(), options);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
/* wrap async FileSystem::SetOwner with promise to make it a blocking call */
std::shared_ptr<std::promise<hdfs::Status>> promise = std::make_shared<std::promise<hdfs::Status>>();
std::future<hdfs::Status> future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) {
promise->set_value(s);
};
if(!recursive){
fs->SetOwner(uri->get_path(), owner, group, handler);
}
else {
//Allocating shared state, which includes:
//username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done
std::shared_ptr<SetOwnerState> state = std::make_shared<SetOwnerState>(owner, group, handler, 0, false);
// Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0.
// Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind.
auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector<hdfs::StatInfo> & stat_infos, bool has_more_results) -> bool {
//For each result returned by Find we call async SetOwner with the handler below.
//SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner.
auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) {
std::lock_guard<std::mutex> guard(state->lock);
//Decrement the counter once since we are done with this async call
if (!status_set_owner.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_set_owner;
}
state->request_counter--;
if(state->request_counter == 0 && state->find_is_done){
state->handler(state->status); //exit
}
};
if(!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const& s : stat_infos) {
//Launch an asynchronous call to SetOwner for every returned result
state->request_counter++;
fs->SetOwner(s.full_path, state->username, state->groupname, handlerSetOwner);
}
}
//Lock this section because handlerSetOwner might be accessing the same
//shared variables simultaneously
std::lock_guard<std::mutex> guard(state->lock);
if (!status_find.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_find;
}
if(!has_more_results){
state->find_is_done = true;
if(state->request_counter == 0){
state->handler(state->status); //exit
}
return false;
}
return true;
};
//Asynchronous call to Find
fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind);
}
/* block until promise is set */
hdfs::Status status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,194 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include <future>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_chmod [OPTION] <MODE[,MODE]... | OCTALMODE> FILE"
<< std::endl
<< std::endl << "Change the permissions of each FILE to MODE."
<< std::endl << "The user must be the owner of the file, or else a super-user."
<< std::endl << "Additional information is in the Permissions Guide:"
<< std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html"
<< std::endl
<< std::endl << " -R operate on files and directories recursively"
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_chmod -R 755 hdfs://localhost.localdomain:9433/dir/file"
<< std::endl << "hdfs_chmod 777 /dir/file"
<< std::endl;
}
struct SetPermissionState {
const uint16_t permissions;
const std::function<void(const hdfs::Status &)> handler;
//The request counter is incremented once every time SetOwner async call is made
uint64_t request_counter;
//This boolean will be set when find returns the last result
bool find_is_done;
//Final status to be returned
hdfs::Status status;
//Shared variables will need protection with a lock
std::mutex lock;
SetPermissionState(const uint16_t permissions_, const std::function<void(const hdfs::Status &)> & handler_,
uint64_t request_counter_, bool find_is_done_)
: permissions(permissions_),
handler(handler_),
request_counter(request_counter_),
find_is_done(find_is_done_),
status(),
lock() {
}
};
int main(int argc, char *argv[]) {
//We should have 3 or 4 parameters
if (argc != 3 && argc != 4) {
usage();
exit(EXIT_FAILURE);
}
bool recursive = false;
int input;
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "Rh")) != -1) {
switch (input)
{
case 'R':
recursive = 1;
break;
case 'h':
usage();
exit(EXIT_SUCCESS);
break;
case '?':
if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string permissions = argv[optind];
std::string uri_path = argv[optind + 1];
//Building a URI object from the given uri_path
hdfs::optional<hdfs::URI> uri = hdfs::URI::parse_from_string(uri_path);
if (!uri) {
std::cerr << "Malformed URI: " << uri_path << std::endl;
exit(EXIT_FAILURE);
}
//TODO: HDFS-9539 Currently options can be returned empty
hdfs::Options options = *hdfs::getOptions();
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
options.rpc_timeout = std::numeric_limits<int>::max();
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri.value(), options);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
/* wrap async FileSystem::SetPermission with promise to make it a blocking call */
std::shared_ptr<std::promise<hdfs::Status>> promise = std::make_shared<std::promise<hdfs::Status>>();
std::future<hdfs::Status> future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) {
promise->set_value(s);
};
//strtol() is reading the value with base 8, NULL because we are reading in just one value.
uint16_t perm = strtol(permissions.c_str(), NULL, 8);
if(!recursive){
fs->SetPermission(uri->get_path(), perm, handler);
}
else {
//Allocating shared state, which includes:
//username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done
std::shared_ptr<SetPermissionState> state = std::make_shared<SetPermissionState>(perm, handler, 0, false);
// Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0.
// Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind.
auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector<hdfs::StatInfo> & stat_infos, bool has_more_results) -> bool {
//For each result returned by Find we call async SetOwner with the handler below.
//SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner.
auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) {
std::lock_guard<std::mutex> guard(state->lock);
//Decrement the counter once since we are done with this async call
if (!status_set_owner.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_set_owner;
}
state->request_counter--;
if(state->request_counter == 0 && state->find_is_done){
state->handler(state->status); //exit
}
};
if(!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const& s : stat_infos) {
//Launch an asynchronous call to SetOwner for every returned result
state->request_counter++;
fs->SetPermission(s.full_path, state->permissions, handlerSetOwner);
}
}
//Lock this section because handlerSetOwner might be accessing the same
//shared variables simultaneously
std::lock_guard<std::mutex> guard(state->lock);
if (!status_find.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_find;
}
if(!has_more_results){
state->find_is_done = true;
if(state->request_counter == 0){
state->handler(state->status); //exit
}
return false;
}
return true;
};
//Asynchronous call to Find
fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind);
}
/* block until promise is set */
hdfs::Status status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,206 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include <future>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_chown [OPTION] [OWNER][:[GROUP]] FILE"
<< std::endl
<< std::endl << "Change the owner and/or group of each FILE to OWNER and/or GROUP."
<< std::endl << "The user must be a super-user. Additional information is in the Permissions Guide:"
<< std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html"
<< std::endl
<< std::endl << " -R operate on files and directories recursively"
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Owner is unchanged if missing. Group is unchanged if missing."
<< std::endl << "OWNER and GROUP may be numeric as well as symbolic."
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_chown -R new_owner:new_group hdfs://localhost.localdomain:9433/dir/file"
<< std::endl << "hdfs_chown new_owner /dir/file"
<< std::endl;
}
struct SetOwnerState {
const std::string username;
const std::string groupname;
const std::function<void(const hdfs::Status &)> handler;
//The request counter is incremented once every time SetOwner async call is made
uint64_t request_counter;
//This boolean will be set when find returns the last result
bool find_is_done;
//Final status to be returned
hdfs::Status status;
//Shared variables will need protection with a lock
std::mutex lock;
SetOwnerState(const std::string & username_, const std::string & groupname_,
const std::function<void(const hdfs::Status &)> & handler_,
uint64_t request_counter_, bool find_is_done_)
: username(username_),
groupname(groupname_),
handler(handler_),
request_counter(request_counter_),
find_is_done(find_is_done_),
status(),
lock() {
}
};
int main(int argc, char *argv[]) {
//We should have 3 or 4 parameters
if (argc != 3 && argc != 4) {
usage();
exit(EXIT_FAILURE);
}
bool recursive = false;
int input;
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "Rh")) != -1) {
switch (input)
{
case 'R':
recursive = 1;
break;
case 'h':
usage();
exit(EXIT_SUCCESS);
break;
case '?':
if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string owner_and_group = argv[optind];
std::string uri_path = argv[optind + 1];
std::string owner, group;
size_t owner_end = owner_and_group.find(":");
if(owner_end == std::string::npos) {
owner = owner_and_group;
} else {
owner = owner_and_group.substr(0, owner_end);
group = owner_and_group.substr(owner_end + 1);
}
//Building a URI object from the given uri_path
hdfs::optional<hdfs::URI> uri = hdfs::URI::parse_from_string(uri_path);
if (!uri) {
std::cerr << "Malformed URI: " << uri_path << std::endl;
exit(EXIT_FAILURE);
}
//TODO: HDFS-9539 Currently options can be returned empty
hdfs::Options options = *hdfs::getOptions();
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
options.rpc_timeout = std::numeric_limits<int>::max();
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri.value(), options);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
/* wrap async FileSystem::SetOwner with promise to make it a blocking call */
std::shared_ptr<std::promise<hdfs::Status>> promise = std::make_shared<std::promise<hdfs::Status>>();
std::future<hdfs::Status> future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) {
promise->set_value(s);
};
if(!recursive){
fs->SetOwner(uri->get_path(), owner, group, handler);
}
else {
//Allocating shared state, which includes:
//username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done
std::shared_ptr<SetOwnerState> state = std::make_shared<SetOwnerState>(owner, group, handler, 0, false);
// Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0.
// Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind.
auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector<hdfs::StatInfo> & stat_infos, bool has_more_results) -> bool {
//For each result returned by Find we call async SetOwner with the handler below.
//SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner.
auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) {
std::lock_guard<std::mutex> guard(state->lock);
//Decrement the counter once since we are done with this async call
if (!status_set_owner.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_set_owner;
}
state->request_counter--;
if(state->request_counter == 0 && state->find_is_done){
state->handler(state->status); //exit
}
};
if(!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const& s : stat_infos) {
//Launch an asynchronous call to SetOwner for every returned result
state->request_counter++;
fs->SetOwner(s.full_path, state->username, state->groupname, handlerSetOwner);
}
}
//Lock this section because handlerSetOwner might be accessing the same
//shared variables simultaneously
std::lock_guard<std::mutex> guard(state->lock);
if (!status_find.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_find;
}
if(!has_more_results){
state->find_is_done = true;
if(state->request_counter == 0){
state->handler(state->status); //exit
}
return false;
}
return true;
};
//Asynchronous call to Find
fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind);
}
/* block until promise is set */
hdfs::Status status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,156 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include <future>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_find [OPTION] PATH"
<< std::endl
<< std::endl << "Finds all files recursively starting from the"
<< std::endl << "specified PATH and prints their file paths."
<< std::endl << "This hdfs_find tool mimics the POSIX find."
<< std::endl
<< std::endl << "Both PATH and NAME can have wild-cards."
<< std::endl
<< std::endl << " -n NAME if provided all results will be matching the NAME pattern"
<< std::endl << " otherwise, the implicit '*' will be used"
<< std::endl << " NAME allows wild-cards"
<< std::endl
<< std::endl << " -m MAX_DEPTH if provided the maximum depth to recurse after the end of"
<< std::endl << " the path is reached will be limited by MAX_DEPTH"
<< std::endl << " otherwise, the maximum depth to recurse is unbound"
<< std::endl << " MAX_DEPTH can be set to 0 for pure globbing and ignoring"
<< std::endl << " the NAME option (no recursion after the end of the path)"
<< std::endl
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_find hdfs://localhost.localdomain:9433/dir?/tree* -n some?file*name"
<< std::endl << "hdfs_find / -n file_name -m 3"
<< std::endl;
}
int main(int argc, char *argv[]) {
//We should have at least 2 arguments
if (argc < 2) {
usage();
exit(EXIT_FAILURE);
}
int input;
//If NAME is not specified we use implicit "*"
std::string name = "*";
//If MAX_DEPTH is not specified we use the max value of uint_32_t
uint32_t max_depth = hdfs::FileSystem::GetDefaultFindMaxDepth();
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "hn:m:")) != -1) {
switch (input)
{
case 'h':
usage();
exit(EXIT_SUCCESS);
break;
case 'n':
name = optarg;
break;
case 'm':
max_depth = std::stoi(optarg);
break;
case '?':
if (optopt == 'n' || optopt == 'm')
std::cerr << "Option -" << (char) optopt << " requires an argument." << std::endl;
else if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string uri_path = argv[optind];
//Building a URI object from the given uri_path
hdfs::optional<hdfs::URI> uri = hdfs::URI::parse_from_string(uri_path);
if (!uri) {
std::cerr << "Malformed URI: " << uri_path << std::endl;
exit(EXIT_FAILURE);
}
//TODO: HDFS-9539 Currently options can be returned empty
hdfs::Options options = *hdfs::getOptions();
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
options.rpc_timeout = std::numeric_limits<int>::max();
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri.value(), options);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
std::promise<void> promise;
std::future<void> future(promise.get_future());
hdfs::Status status = hdfs::Status::OK();
/**
* Keep requesting more until we get the entire listing. Set the promise
* when we have the entire listing to stop.
*
* Find guarantees that the handler will only be called once at a time,
* so we do not need any locking here
*/
auto handler = [&promise, &status]
(const hdfs::Status &s, const std::vector<hdfs::StatInfo> & si, bool has_more_results) -> bool {
//Print result chunks as they arrive
if(!si.empty()) {
for (hdfs::StatInfo const& s : si) {
std::cout << s.full_path << std::endl;
}
}
if(!s.ok() && status.ok()){
//We make sure we set 'status' only on the first error.
status = s;
}
if (!has_more_results) {
promise.set_value(); //set promise
return false; //request stop sending results
}
return true; //request more results
};
//Asynchronous call to Find
fs->Find(uri->get_path(), name, max_depth, handler);
//block until promise is set
future.get();
if(!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,70 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include "tools_common.h"
namespace hdfs {
std::shared_ptr<hdfs::Options> getOptions() {
std::shared_ptr<hdfs::Options> options = std::make_shared<hdfs::Options>();
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
hdfs::ConfigurationLoader loader;
//Loading default config files core-site.xml and hdfs-site.xml from the config path
hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
if(config){
//Loading options from the config
*options = config->GetOptions();
}
return options;
}
std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, hdfs::Options & options) {
IoService * io_service = IoService::New();
//Wrapping fs into a shared pointer to guarantee deletion
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
if (!fs) {
std::cerr << "Could not create FileSystem object. " << std::endl;
exit(EXIT_FAILURE);
}
Status status;
//Check if the user supplied the host
if(!uri.get_host().empty()){
//If port is supplied we use it, otherwise we use the empty string so that it will be looked up in configs.
std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : "";
status = fs->Connect(uri.get_host(), port);
if (!status.ok()) {
std::cerr << "Could not connect to " << uri.get_host() << ":" << port << ". " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
} else {
status = fs->ConnectToDefaultFs();
if (!status.ok()) {
if(!options.defaultFS.get_host().empty()){
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
} else {
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
}
exit(EXIT_FAILURE);
}
}
return fs;
}
}

View File

@ -0,0 +1,39 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#ifndef TOOLS_COMMON_H_
#define TOOLS_COMMON_H_
#include "hdfspp/hdfspp.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include <mutex>
namespace hdfs {
//Pull configurations and get the Options object
std::shared_ptr<hdfs::Options> getOptions();
//Build all necessary objects and perform the connection
std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, hdfs::Options & options);
}
#endif