From 4f6cb5d1a150b366516ddfc3409925c26a1a2a6a Mon Sep 17 00:00:00 2001 From: James Date: Fri, 26 Aug 2016 12:52:05 -0400 Subject: [PATCH] HDFS-10754: libhdfs++: Create tools directory and implement hdfs_cat, hdfs_chgrp, hdfs_chown, hdfs_chmod and hdfs_find. Contributed by Anatoli Shein. --- .../src/main/native/libhdfspp/CMakeLists.txt | 1 + .../libhdfspp/examples/cpp/CMakeLists.txt | 3 +- .../native/libhdfspp/examples/cpp/cat/cat.cpp | 124 ++++----- .../examples/cpp/find/CMakeLists.txt | 35 +++ .../libhdfspp/examples/cpp/find/find.cpp | 162 ++++++++++++ .../examples/cpp/gendirs/gendirs.cpp | 116 ++++----- .../libhdfspp/include/hdfspp/hdfs_ext.h | 14 + .../native/libhdfspp/include/hdfspp/hdfspp.h | 45 +++- .../libhdfspp/include/hdfspp/statinfo.h | 1 + .../native/libhdfspp/lib/bindings/c/hdfs.cc | 64 ++++- .../lib/common/configuration_loader.cc | 8 +- .../lib/common/configuration_loader.h | 8 +- .../lib/common/configuration_loader_impl.h | 2 +- .../native/libhdfspp/lib/fs/filesystem.cc | 244 ++++++++++++++++-- .../main/native/libhdfspp/lib/fs/filesystem.h | 68 ++++- .../libhdfspp/lib/fs/namenode_operations.cc | 82 ++---- .../libhdfspp/lib/fs/namenode_operations.h | 8 +- .../libhdfspp/tests/configuration_test.cc | 84 ++++-- .../libhdfspp/tests/configuration_test.h | 4 +- .../tests/hdfs_configuration_test.cc | 9 +- .../main/native/libhdfspp/tests/hdfs_shim.c | 4 + .../libhdfspp/tests/libhdfs_wrapper_undefs.h | 1 + .../tests/libhdfspp_wrapper_defines.h | 1 + .../native/libhdfspp/tools/CMakeLists.txt | 42 +++ .../main/native/libhdfspp/tools/hdfs_cat.cpp | 120 +++++++++ .../native/libhdfspp/tools/hdfs_chgrp.cpp | 196 ++++++++++++++ .../native/libhdfspp/tools/hdfs_chmod.cpp | 194 ++++++++++++++ .../native/libhdfspp/tools/hdfs_chown.cpp | 206 +++++++++++++++ .../main/native/libhdfspp/tools/hdfs_find.cpp | 156 +++++++++++ .../native/libhdfspp/tools/tools_common.cpp | 70 +++++ .../native/libhdfspp/tools/tools_common.h | 39 +++ 31 files changed, 1828 insertions(+), 283 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_cat.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chgrp.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chmod.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chown.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_find.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt index a663e6038b..da564031a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt @@ -163,6 +163,7 @@ add_subdirectory(third_party/uriparser2) add_subdirectory(lib) add_subdirectory(tests) add_subdirectory(examples) +add_subdirectory(tools) # create an empty file; hadoop_add_dual_library wraps add_library which # requires at least one file as an argument diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt index 183299ad62..9e16b0b81a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt @@ -17,4 +17,5 @@ # add_subdirectory(cat) -add_subdirectory(gendirs) \ No newline at end of file +add_subdirectory(gendirs) +add_subdirectory(find) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp index bfab50717f..17626ea92f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp @@ -17,105 +17,91 @@ under the License. */ -/* - A a stripped down version of unix's "cat". - Doesn't deal with any flags for now, will just attempt to read the whole file. -*/ +/** + * Unix-like cat tool example. + * + * Reads the specified file from HDFS and outputs to stdout. + * + * Usage: cat / + * + * Example: cat /dir/file + * + * @param path-to-file Absolute path to the file to read. + * + **/ #include "hdfspp/hdfspp.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" -#include "common/uri.h" -#include +#include -using namespace std; -using namespace hdfs; - -#define SCHEME "hdfs" +#define BUF_SIZE 4096 int main(int argc, char *argv[]) { if (argc != 2) { - cerr << "usage: cat [hdfs://[:]]/" << endl; - return 1; + std::cerr << "usage: cat /" << std::endl; + exit(EXIT_FAILURE); } + std::string path = argv[1]; - optional uri; - const string uri_path = argv[1]; - - //Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind - size_t scheme_end = uri_path.find("://"); - if (scheme_end != string::npos) { - if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) { - cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl; - return 1; - } else { - uri = URI::parse_from_string(uri_path); - } - } - if (!uri) { - cerr << "Malformed URI: " << uri_path << endl; - return 1; - } - - ConfigurationLoader loader; - optional config = loader.LoadDefaultResources(); - const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR"); - if (envHadoopConfDir && (*envHadoopConfDir != 0) ) { - config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml"); - } - - Options options; + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional config = loader.LoadDefaultResources(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. if(config){ + //Loading options from the config options = config->GetOptions(); } - - IoService * io_service = IoService::New(); - - FileSystem *fs_raw = FileSystem::New(io_service, "", options); - if (!fs_raw) { - cerr << "Could not create FileSystem object" << endl; - return 1; + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping fs into a shared pointer to guarantee deletion + std::shared_ptr fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not connect the file system." << std::endl; + exit(EXIT_FAILURE); } - //wrapping fs_raw into a unique pointer to guarantee deletion - unique_ptr fs(fs_raw); - - Status stat = fs->Connect(uri->get_host(), to_string(*(uri->get_port()))); - if (!stat.ok()) { - cerr << "Could not connect to " << uri->get_host() << ":" << *(uri->get_port()) << endl; - return 1; + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); } - FileHandle *file_raw = nullptr; - stat = fs->Open(uri->get_path(), &file_raw); - if (!stat.ok()) { - cerr << "Could not open file " << uri->get_path() << endl; - return 1; + hdfs::FileHandle *file_raw = nullptr; + status = fs->Open(path, &file_raw); + if (!status.ok()) { + std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl; + exit(EXIT_FAILURE); } //wrapping file_raw into a unique pointer to guarantee deletion - unique_ptr file(file_raw); + std::unique_ptr file(file_raw); - char input_buffer[4096]; - ssize_t read_bytes_count = 0; - size_t last_read_bytes = 0; + char input_buffer[BUF_SIZE]; + ssize_t total_bytes_read = 0; + size_t last_bytes_read = 0; do{ //Reading file chunks - Status stat = file->PositionRead(input_buffer, sizeof(input_buffer), read_bytes_count, &last_read_bytes); - if(stat.ok()) { + status = file->Read(input_buffer, sizeof(input_buffer), &last_bytes_read); + if(status.ok()) { //Writing file chunks to stdout - fwrite(input_buffer, last_read_bytes, 1, stdout); - read_bytes_count += last_read_bytes; + fwrite(input_buffer, last_bytes_read, 1, stdout); + total_bytes_read += last_bytes_read; } else { - if(stat.is_invalid_offset()){ + if(status.is_invalid_offset()){ //Reached the end of the file break; } else { - cerr << "Error reading the file: " << stat.ToString() << endl; - return 1; + std::cerr << "Error reading the file: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); } } - } while (last_read_bytes > 0); + } while (last_bytes_read > 0); // Clean up static data and prevent valgrind memory leaks google::protobuf::ShutdownProtobufLibrary(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt new file mode 100644 index 0000000000..7ae27c5b9f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default LIBHDFSPP_DIR to the default install location. You can override +# it by add -DLIBHDFSPP_DIR=... to your cmake invocation +set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) + +include_directories( ${LIBHDFSPP_DIR}/include ) +link_directories( ${LIBHDFSPP_DIR}/lib ) + +add_executable(find_cpp find.cpp) +target_link_libraries(find_cpp hdfspp) + +# Several examples in different languages need to produce executables with +# same names. To allow executables with same names we keep their CMake +# names different, but specify their executable names as follows: +set_target_properties( find_cpp + PROPERTIES + OUTPUT_NAME "find" +) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp new file mode 100644 index 0000000000..21a731bc49 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp @@ -0,0 +1,162 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +/** + * A parallel find tool example. + * + * Finds all files matching the specified name recursively starting from the + * specified directory and prints their filepaths. Works either synchronously + * or asynchronously. + * + * Usage: find / + * + * Example: find /dir?/tree* some?file*name 1 + * + * @param path-to-file Absolute path at which to begin search, can have wild + * cards and must be non-blank + * @param file-name Name to find, can have wild cards and must be non-blank + * @param use_async If set to 1 it prints out results asynchronously as + * they arrive. If set to 0 results are printed in one + * big chunk when it becomes available. + * + **/ + +#include "hdfspp/hdfspp.h" +#include "common/hdfs_configuration.h" +#include "common/configuration_loader.h" + +#include +#include + +void SyncFind(std::shared_ptr fs, const std::string &path, const std::string &name){ + std::vector results; + //Synchronous call to Find + hdfs::Status stat = fs->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &results); + + if (!stat.ok()) { + std::cerr << "Error: " << stat.ToString() << std::endl; + } + + if(results.empty()){ + std::cout << "Nothing Found" << std::endl; + } else { + //Printing out the results + for (hdfs::StatInfo const& si : results) { + std::cout << si.full_path << std::endl; + } + } +} + +void AsyncFind(std::shared_ptr fs, const std::string &path, const std::string &name){ + std::promise promise; + std::future future(promise.get_future()); + bool something_found = false; + hdfs::Status status = hdfs::Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto handler = [&promise, &status, &something_found] + (const hdfs::Status &s, const std::vector & si, bool has_more_results) -> bool { + //Print result chunks as they arrive + if(!si.empty()) { + something_found = true; + for (hdfs::StatInfo const& s : si) { + std::cout << s.full_path << std::endl; + } + } + if(!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); //set promise + return false; //request stop sending results + } + return true; //request more results + }; + + //Asynchronous call to Find + fs->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), handler); + + //block until promise is set + future.get(); + if(!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + } + if(!something_found){ + std::cout << "Nothing Found" << std::endl; + } +} + +int main(int argc, char *argv[]) { + if (argc != 4) { + std::cerr << "usage: find / " << std::endl; + exit(EXIT_FAILURE); + } + + std::string path = argv[1]; + std::string name = argv[2]; + bool use_async = (std::stoi(argv[3]) != 0); + + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional config = loader.LoadDefaultResources(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. + if(config){ + //Loading options from the config + options = config->GetOptions(); + } + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits::max(); + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping fs into a unique pointer to guarantee deletion + std::shared_ptr fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not connect the file system." << std::endl; + exit(EXIT_FAILURE); + } + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); + } + + if (use_async){ + //Example of Async find + AsyncFind(fs, path, name); + } else { + //Example of Sync find + SyncFind(fs, path, name); + } + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp index 9f5ae5a1e7..c90abbd2b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp @@ -23,9 +23,9 @@ * Generates a directory tree with specified depth and fanout starting from * a given path. Generation is asynchronous. * - * Usage: gendirs [hdfs://[:]]/ + * Usage: gendirs / * - * Example: gendirs hdfs://localhost.localdomain:9433/dir0 3 10 + * Example: gendirs /dir0 3 10 * * @param path-to-dir Absolute path to the directory tree root where the * directory tree will be generated @@ -37,99 +37,76 @@ **/ #include "hdfspp/hdfspp.h" -#include "fs/namenode_operations.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" -#include "common/uri.h" -#include +#include #include -using namespace std; -using namespace hdfs; +#define DEFAULT_PERMISSIONS 0755 -#define SCHEME "hdfs" - -void GenerateDirectories (shared_ptr fs, int depth, int level, int fanout, string path, vector> & futures) { +void GenerateDirectories (std::shared_ptr fs, int depth, int level, int fanout, std::string path, std::vector> & futures) { //Level contains our current depth in the directory tree if(level < depth) { for(int i = 0; i < fanout; i++){ //Recursive calls to cover all possible paths from the root to the leave nodes - GenerateDirectories(fs, depth, level+1, fanout, path + "dir" + to_string(i) + "/", futures); + GenerateDirectories(fs, depth, level+1, fanout, path + "dir" + std::to_string(i) + "/", futures); } } else { //We have reached the leaf nodes and now start making calls to create directories //We make a promise which will be set when the call finishes and executes our handler - auto callstate = make_shared>(); + auto callstate = std::make_shared>(); //Extract a future from this promise - future future(callstate->get_future()); + std::future future(callstate->get_future()); //Save this future to the vector of futures which will be used to wait on all promises //after the whole recursion is done - futures.push_back(move(future)); + futures.push_back(std::move(future)); //Create a handler that will be executed when Mkdirs is done - auto handler = [callstate](const Status &s) { + auto handler = [callstate](const hdfs::Status &s) { callstate->set_value(s); }; //Asynchronous call to create this directory along with all missing parent directories - fs->Mkdirs(path, NameNodeOperations::GetDefaultPermissionMask(), true, handler); + fs->Mkdirs(path, DEFAULT_PERMISSIONS, true, handler); } } int main(int argc, char *argv[]) { if (argc != 4) { - cerr << "usage: gendirs [hdfs://[:]]/ " << endl; - return 1; + std::cerr << "usage: gendirs / " << std::endl; + exit(EXIT_FAILURE); } - optional uri; - const string uri_path = argv[1]; - const int depth = stoi(argv[2]); - const int fanout = stoi(argv[3]); + std::string path = argv[1]; + int depth = std::stoi(argv[2]); + int fanout = std::stoi(argv[3]); - //Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind - size_t scheme_end = uri_path.find("://"); - if (scheme_end != string::npos) { - if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) { - cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl; - return 1; - } else { - uri = URI::parse_from_string(uri_path); - } - } - if (!uri) { - cerr << "Malformed URI: " << uri_path << endl; - return 1; - } - - ConfigurationLoader loader; - optional config = loader.LoadDefaultResources(); - const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR"); - if (envHadoopConfDir && (*envHadoopConfDir != 0) ) { - config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml"); - } - - Options options; - options.rpc_timeout = numeric_limits::max(); + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional config = loader.LoadDefaultResources(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. if(config){ + //Loading options from the config options = config->GetOptions(); } - - IoService * io_service = IoService::New(); - - FileSystem *fs_raw = FileSystem::New(io_service, "", options); - if (!fs_raw) { - cerr << "Could not create FileSystem object" << endl; - return 1; + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits::max(); + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping fs into a unique pointer to guarantee deletion + std::shared_ptr fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not connect the file system." << std::endl; + exit(EXIT_FAILURE); } - //Wrapping fs_raw into a unique pointer to guarantee deletion - shared_ptr fs(fs_raw); - - //Get port from the uri, otherwise use the default port - string port = to_string(uri->get_port().value_or(8020)); - Status stat = fs->Connect(uri->get_host(), port); - if (!stat.ok()) { - cerr << "Could not connect to " << uri->get_host() << ":" << port << endl; - return 1; + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); } /** @@ -143,22 +120,23 @@ int main(int argc, char *argv[]) { * processed. After the whole recursion is complete we will need to wait until * all promises are set before we can exit. **/ - vector> futures; + std::vector> futures; - GenerateDirectories(fs, depth, 0, fanout, uri->get_path() + "/", futures); + GenerateDirectories(fs, depth, 0, fanout, path + "/", futures); /** * We are waiting here until all promises are set, and checking whether * the returned statuses contained any errors. **/ - for(future &fs : futures){ - Status stat = fs.get(); - if (!stat.ok()) { - cerr << "Error: " << stat.ToString() << endl; + for(std::future &fs : futures){ + hdfs::Status status = fs.get(); + if (!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); } } - cout << "All done!" << endl; + std::cout << "All done!" << std::endl; // Clean up static data and prevent valgrind memory leaks google::protobuf::ShutdownProtobufLibrary(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index ce9f0f5034..b41857ca14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -287,6 +287,20 @@ LIBHDFS_EXTERNAL int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie); +/** + * Finds file name on the file system. hdfsFreeFileInfo should be called to deallocate memory. + * + * @param fs The filesystem (required) + * @param path Path at which to begin search, can have wild cards (must be non-blank) + * @param name Name to find, can have wild cards (must be non-blank) + * @param numEntries Set to the number of files/directories in the result. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error or empty result. + * errno is set to non-zero on error or zero on success. + **/ +hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries); + + /***************************************************************************** * HDFS SNAPSHOT FUNCTIONS ****************************************************************************/ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 46fe8e9bb3..4b88fe5ef6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -138,6 +138,18 @@ public: **/ class FileSystem { public: + //Returns the default maximum depth for recursive Find tool + static uint32_t GetDefaultFindMaxDepth(); + + //Returns the default permission mask + static uint16_t GetDefaultPermissionMask(); + + //Checks if the given permission mask is valid + static Status CheckValidPermissionMask(uint16_t permissions); + + //Checks if replication value is valid + static Status CheckValidReplication(uint16_t replication); + /** * Create a new instance of the FileSystem object. The call * initializes the RPC connections to the NameNode and returns an @@ -236,7 +248,7 @@ class FileSystem { * * The asynchronous method will return batches of files; the consumer must * return true if they want more files to be delivered. The final bool - * parameter in the callback will be set to true if this is the final + * parameter in the callback will be set to false if this is the final * batch of files. * * The synchronous method will return all files in the directory. @@ -245,9 +257,8 @@ class FileSystem { **/ virtual void GetListing(const std::string &path, - const std::function> &, bool)> &handler) = 0; - virtual Status GetListing(const std::string &path, - std::shared_ptr> & stat_infos) = 0; + const std::function &, bool)> &handler) = 0; + virtual Status GetListing(const std::string &path, std::vector * stat_infos) = 0; /** * Returns the locations of all known blocks for the indicated file (or part of it), or an error @@ -297,8 +308,8 @@ class FileSystem { * @param path the path to the file or directory * @param permissions the bitmask to set it to (should be between 0 and 01777) */ - virtual void SetPermission(const std::string & path, - uint16_t permissions, const std::function &handler) = 0; + virtual void SetPermission(const std::string & path, uint16_t permissions, + const std::function &handler) = 0; virtual Status SetPermission(const std::string & path, uint16_t permissions) = 0; /** @@ -307,12 +318,34 @@ class FileSystem { * @param path file path * @param username If it is empty, the original username remains unchanged. * @param groupname If it is empty, the original groupname remains unchanged. + * @param recursive If true, the change will be propagated recursively. */ virtual void SetOwner(const std::string & path, const std::string & username, const std::string & groupname, const std::function &handler) = 0; virtual Status SetOwner(const std::string & path, const std::string & username, const std::string & groupname) = 0; + /** + * Finds all files matching the specified name recursively starting from the + * specified directory. Returns metadata for each of them. + * + * Example: Find("/dir?/tree*", "some?file*name") + * + * @param path Absolute path at which to begin search, can have wild cards (must be non-blank) + * @param name Name to find, can also have wild cards (must be non-blank) + * + * The asynchronous method will return batches of files; the consumer must + * return true if they want more files to be delivered. The final bool + * parameter in the callback will be set to false if this is the final + * batch of files. + * + * The synchronous method will return matching files. + **/ + virtual void + Find(const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function & , bool)> &handler) = 0; + virtual Status Find(const std::string &path, const std::string &name, + const uint32_t maxdepth, std::vector * stat_infos) = 0; /***************************************************************************** diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h index a53ab8b609..e077ddaff5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h @@ -33,6 +33,7 @@ struct StatInfo { int file_type; ::std::string path; + ::std::string full_path; unsigned long int length; unsigned long int permissions; //Octal number as in POSIX permissions; e.g. 0777 ::std::string owner; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 4003358fc3..a43d94f661 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -731,22 +731,21 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { if(!abs_path) { return nullptr; } - std::shared_ptr> stat_infos; - Status stat = fs->get_impl()->GetListing(*abs_path, stat_infos); + std::vector stat_infos; + Status stat = fs->get_impl()->GetListing(*abs_path, &stat_infos); if (!stat.ok()) { Error(stat); *numEntries = 0; return nullptr; } - //Existing API expects nullptr if size is 0 - if(!stat_infos || stat_infos->size()==0){ + if(stat_infos.empty()){ *numEntries = 0; return nullptr; } - *numEntries = stat_infos->size(); - hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos->size()]; - for(std::vector::size_type i = 0; i < stat_infos->size(); i++) { - StatInfoToHdfsFileInfo(&file_infos[i], stat_infos->at(i)); + *numEntries = stat_infos.size(); + hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()]; + for(std::vector::size_type i = 0; i < stat_infos.size(); i++) { + StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i)); } return file_infos; @@ -785,7 +784,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) { } Status stat; //Use default permissions and set true for creating all non-existant parent directories - stat = fs->get_impl()->Mkdirs(*abs_path, NameNodeOperations::GetDefaultPermissionMask(), true); + stat = fs->get_impl()->Mkdirs(*abs_path, FileSystem::GetDefaultPermissionMask(), true); if (!stat.ok()) { return Error(stat); } @@ -854,7 +853,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){ if(!abs_path) { return -1; } - Status stat = NameNodeOperations::CheckValidPermissionMask(mode); + Status stat = FileSystem::CheckValidPermissionMask(mode); if (!stat.ok()) { return Error(stat); } @@ -896,6 +895,44 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) } } +hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){ + try { + errno = 0; + if (!CheckSystem(fs)) { + *numEntries = 0; + return nullptr; + } + + std::vector stat_infos; + Status stat = fs->get_impl()->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &stat_infos); + if (!stat.ok()) { + Error(stat); + *numEntries = 0; + return nullptr; + } + //Existing API expects nullptr if size is 0 + if(stat_infos.empty()){ + *numEntries = 0; + return nullptr; + } + *numEntries = stat_infos.size(); + hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()]; + for(std::vector::size_type i = 0; i < stat_infos.size(); i++) { + StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i)); + } + + return file_infos; + } catch (const std::exception & e) { + ReportException(e); + *numEntries = 0; + return nullptr; + } catch (...) { + ReportCaughtNonException(); + *numEntries = 0; + return nullptr; + } +} + int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { try { errno = 0; @@ -1373,19 +1410,18 @@ HdfsConfiguration LoadDefault(ConfigurationLoader & loader) } else { - return loader.New(); + return loader.NewConfig(); } } -hdfsBuilder::hdfsBuilder() : config(loader.New()) +hdfsBuilder::hdfsBuilder() : config(loader.NewConfig()) { errno = 0; - loader.SetDefaultSearchPath(); config = LoadDefault(loader); } hdfsBuilder::hdfsBuilder(const char * directory) : - config(loader.New()) + config(loader.NewConfig()) { errno = 0; loader.SetSearchPath(directory); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc index 1eb70c3b0e..e1434daddc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc @@ -62,6 +62,12 @@ bool str_to_bool(const std::string& raw) { return false; } +ConfigurationLoader::ConfigurationLoader() { + //In order to creates a configuration loader with the default search path + //("$HADOOP_CONF_DIR" or "/etc/hadoop/conf") we call SetDefaultSearchPath(). + ConfigurationLoader::SetDefaultSearchPath(); +} + void ConfigurationLoader::SetDefaultSearchPath() { // Try (in order, taking the first valid one): // $HADOOP_CONF_DIR @@ -257,4 +263,4 @@ bool ConfigurationLoader::UpdateMapWithValue(ConfigMap& map, return true; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h index 059e48b04d..51ac23aa13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h @@ -27,9 +27,9 @@ namespace hdfs { class ConfigurationLoader { public: // Creates a new, empty Configuration object - // T must be Configuration or a subclass + // T must be Configuration or a subclass template - T New(); + T NewConfig(); /**************************************************************************** * LOADING CONFIG FILES @@ -79,6 +79,10 @@ public: * SEARCH PATH METHODS ***************************************************************************/ + //Creates a configuration loader with the default search path ("$HADOOP_CONF_DIR" or "/etc/hadoop/conf"). + //If you want to explicitly set the entire search path, call ClearSearchPath() first + ConfigurationLoader(); + // Sets the search path to the default search path (namely, "$HADOOP_CONF_DIR" or "/etc/hadoop/conf") void SetDefaultSearchPath(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h index 9e18878cb2..6258450c0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h @@ -23,7 +23,7 @@ namespace hdfs { template -T ConfigurationLoader::New() { +T ConfigurationLoader::NewConfig() { return T(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index de6ebb7029..d75939fd93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -26,6 +26,7 @@ #include #include #include +#include #define FMT_THIS_ADDR "this=" << (void*)this @@ -39,6 +40,34 @@ using ::asio::ip::tcp; static constexpr uint16_t kDefaultPort = 8020; +uint32_t FileSystem::GetDefaultFindMaxDepth() { + return std::numeric_limits::max(); +} + +uint16_t FileSystem::GetDefaultPermissionMask() { + return 0755; +} + +Status FileSystem::CheckValidPermissionMask(uint16_t permissions) { + if (permissions > 01777) { + std::stringstream errormsg; + errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct + << std::showbase << permissions << " (should be between 0 and 01777)"; + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + +Status FileSystem::CheckValidReplication(uint16_t replication) { + if (replication < 1 || replication > 512) { + std::stringstream errormsg; + errormsg << "CheckValidReplication: argument 'replication' is " + << replication << " (should be between 1 and 512)"; + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + /***************************************************************************** * FILESYSTEM BASE CLASS ****************************************************************************/ @@ -446,7 +475,7 @@ void FileSystemImpl::SetReplication(const std::string & path, int16_t replicatio handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); return; } - Status replStatus = NameNodeOperations::CheckValidReplication(replication); + Status replStatus = FileSystem::CheckValidReplication(replication); if (!replStatus.ok()) { handler(replStatus); return; @@ -593,44 +622,43 @@ Status FileSystemImpl::GetFsStats(FsInfo & fs_info) { * Some compilers don't like recursive lambdas, so we make the lambda call a * method, which in turn creates a lambda calling itself. */ -void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr> &stat_infos, bool has_more, - std::string path, - const std::function>&, bool)> &handler) { - bool has_next = stat_infos && stat_infos->size() > 0; +void FileSystemImpl::GetListingShim(const Status &stat, const std::vector & stat_infos, bool has_more, + std::string path, const std::function &, bool)> &handler) { + bool has_next = !stat_infos.empty(); bool get_more = handler(stat, stat_infos, has_more && has_next); if (get_more && has_more && has_next ) { - auto callback = [this, path, handler](const Status &stat, std::shared_ptr> &stat_infos, bool has_more) { + auto callback = [this, path, handler](const Status &stat, const std::vector & stat_infos, bool has_more) { GetListingShim(stat, stat_infos, has_more, path, handler); }; - std::string last = stat_infos->back().path; + std::string last = stat_infos.back().path; nn_.GetListing(path, callback, last); } } void FileSystemImpl::GetListing( const std::string &path, - const std::function>&, bool)> &handler) { + const std::function &, bool)> &handler) { LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); // Caputure the state and push it into the shim - auto callback = [this, path, handler](const Status &stat, std::shared_ptr> &stat_infos, bool has_more) { + auto callback = [this, path, handler](const Status &stat, const std::vector & stat_infos, bool has_more) { GetListingShim(stat, stat_infos, has_more, path, handler); }; nn_.GetListing(path, callback); } -Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr> &stat_infos) { +Status FileSystemImpl::GetListing(const std::string &path, std::vector * stat_infos) { LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - // In this case, we're going to allocate the result on the heap and have the - // async code populate it. - auto results = std::make_shared>(); + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL"); + } auto callstate = std::make_shared>(); std::future future(callstate->get_future()); @@ -640,9 +668,9 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr> si, bool has_more) -> bool { - if (si) { - results->insert(results->end(), si->begin(), si->end()); + auto h = [callstate, stat_infos](const Status &s, const std::vector & si, bool has_more) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); } bool done = !s.ok() || !has_more; @@ -658,11 +686,6 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr & stat_infos, bool directory_has_more, + std::shared_ptr operational_state, std::shared_ptr shared_state) { + //We buffer the outputs then send them back at the end + std::vector outputs; + //Return on error + if(!stat.ok()){ + std::lock_guard find_lock(shared_state->lock); + //We send true becuase we do not want the user code to exit before all our requests finished + shared_state->handler(stat, outputs, true); + shared_state->aborted = true; + } + if(!shared_state->aborted){ + //User did not abort the operation + if (directory_has_more) { + //Directory is large and has more results + //We launch another async call to get more results + shared_state->outstanding_requests++; + auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool has_more) { + FindShim(stat, stat_infos, has_more, operational_state, shared_state); + }; + std::string last = stat_infos.back().path; + nn_.GetListing(operational_state->path, callback, last); + } + if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){ + //We are searching for the path and did not reach the end of the path yet + for (StatInfo const& si : stat_infos) { + //If we are at the last depth and it matches both path and name, we need to output it. + if (operational_state->depth == shared_state->dirs.size() - 2 + && !fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0) + && !fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)) { + outputs.push_back(si); + } + //Skip if not directory + if(si.file_type != StatInfo::IS_DIR) { + continue; + } + //Checking for a match with the path at the current depth + if(!fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0)){ + //Launch a new requests for every matched directory + shared_state->outstanding_requests++; + auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool has_more) { + std::shared_ptr new_current_state = std::make_shared(si.full_path, operational_state->depth + 1, true); //true because searching for the path + FindShim(stat, stat_infos, has_more, new_current_state, shared_state); + }; + nn_.GetListing(si.full_path, callback); + } + } + } + else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){ + //We are searching for the name now and maxdepth has not been reached + for (StatInfo const& si : stat_infos) { + //Launch a new request for every directory + if(si.file_type == StatInfo::IS_DIR) { + shared_state->outstanding_requests++; + auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool has_more) { + std::shared_ptr new_current_state = std::make_shared(si.full_path, operational_state->depth + 1, false); //false because searching for the name + FindShim(stat, stat_infos, has_more, new_current_state, shared_state); + }; + nn_.GetListing(si.full_path, callback); + } + //All names that match the specified name are saved to outputs + if(!fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)){ + outputs.push_back(si); + } + } + } + } + //This section needs a lock to make sure we return the final chunk only once + //and no results are sent after aborted is set + std::lock_guard find_lock(shared_state->lock); + //Decrement the counter once since we are done with this chunk + shared_state->outstanding_requests--; + if(shared_state->outstanding_requests == 0){ + //Send the outputs back to the user and notify that this is the final chunk + shared_state->handler(stat, outputs, false); + } else { + //There will be more results and we are not aborting + if (outputs.size() > 0 && !shared_state->aborted){ + //Send the outputs back to the user and notify that there is more + bool user_wants_more = shared_state->handler(stat, outputs, true); + if(!user_wants_more) { + //Abort if user doesn't want more + shared_state->aborted = true; + } + } + } +} + +void FileSystemImpl::Find( + const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function &, bool)> &handler) { + LOG_INFO(kFileSystem, << "FileSystemImpl::Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + //Populating the operational state, which includes: + //current search path, depth within the path, and the indication that we are currently searching for a path (not name yet). + std::shared_ptr operational_state = std::make_shared(path, 0, true); + //Populating the shared state, which includes: + //vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag. + std::shared_ptr shared_state = std::make_shared(path, name, maxdepth, handler, 1, false); + auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool directory_has_more) { + FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state); + }; + nn_.GetListing("/", callback); +} + +Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector * stat_infos) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL"); + } + + // In this case, we're going to have the async code populate stat_infos. + + std::promise promise = std::promise(); + std::future future(promise.get_future()); + Status status = Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto h = [&status, &promise, stat_infos](const Status &s, const std::vector & si, bool has_more_results) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); + } + if (!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); + return false; + } + return true; + }; + + Find(path, name, maxdepth, h); + + /* block until promise is set */ + future.get(); + return status; +} void FileSystemImpl::CreateSnapshot(const std::string &path, const std::string &name, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 75a43f3430..0e9cedd8c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -94,9 +94,9 @@ public: void GetListing( const std::string &path, - const std::function> &, bool)> &handler) override; + const std::function &, bool)> &handler) override; - Status GetListing(const std::string &path, std::shared_ptr> &stat_infos) override; + Status GetListing(const std::string &path, std::vector * stat_infos) override; virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, const std::function locations)> ) override; @@ -115,8 +115,8 @@ public: const std::function &handler) override; virtual Status Rename(const std::string &oldPath, const std::string &newPath) override; - virtual void SetPermission(const std::string & path, - uint16_t permissions, const std::function &handler) override; + virtual void SetPermission(const std::string & path, uint16_t permissions, + const std::function &handler) override; virtual Status SetPermission(const std::string & path, uint16_t permissions) override; virtual void SetOwner(const std::string & path, const std::string & username, @@ -124,6 +124,11 @@ public: virtual Status SetOwner(const std::string & path, const std::string & username, const std::string & groupname) override; + void Find( + const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function &, bool)> &handler) override; + Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector * stat_infos) override; + /***************************************************************************** * FILE SYSTEM SNAPSHOT FUNCTIONS ****************************************************************************/ @@ -204,9 +209,58 @@ private: **/ std::shared_ptr event_handlers_; - void GetListingShim(const Status &stat, std::shared_ptr> &stat_infos, bool has_more, - std::string path, - const std::function>&, bool)> &handler); + void GetListingShim(const Status &stat, const std::vector & stat_infos, bool has_more, + std::string path, const std::function &, bool)> &handler); + + struct FindSharedState { + //Name pattern (can have wild-cards) to find + const std::string name; + //Maximum depth to recurse after the end of path is reached. + //Can be set to 0 for pure path globbing and ignoring name pattern entirely. + const uint32_t maxdepth; + //Vector of all sub-directories from the path argument (each can have wild-cards) + std::vector dirs; + //Callback from Find + const std::function &, bool)> handler; + //outstanding_requests is incremented once for every GetListing call. + std::atomic outstanding_requests; + //Boolean needed to abort all recursion on error or on user command + std::atomic aborted; + //Shared variables will need protection with a lock + std::mutex lock; + FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_, + const std::function &, bool)> handler_, + uint64_t outstanding_recuests_, bool aborted_) + : name(name_), + maxdepth(maxdepth_), + handler(handler_), + outstanding_requests(outstanding_recuests_), + aborted(aborted_), + lock() { + //Constructing the list of sub-directories + std::stringstream ss(path_); + if(path_.back() != '/'){ + ss << "/"; + } + for (std::string token; std::getline(ss, token, '/'); ) { + dirs.push_back(token); + } + } + }; + + struct FindOperationalState { + const std::string path; + const uint32_t depth; + const bool search_path; + FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_) + : path(path_), + depth(depth_), + search_path(search_path_) { + } + }; + + void FindShim(const Status &stat, const std::vector & stat_infos, + bool directory_has_more, std::shared_ptr current_state, std::shared_ptr shared_state); }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc index 27ccb5dee9..89acac3182 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -39,30 +39,6 @@ namespace hdfs { * NAMENODE OPERATIONS ****************************************************************************/ -uint16_t NameNodeOperations::GetDefaultPermissionMask() { - return 0755; -} - -Status NameNodeOperations::CheckValidPermissionMask(uint16_t permissions) { - if (permissions > 01777) { - std::stringstream errormsg; - errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct - << std::showbase << permissions << " (should be between 0 and 01777)"; - return Status::InvalidArgument(errormsg.str().c_str()); - } - return Status::OK(); -} - -Status NameNodeOperations::CheckValidReplication(uint16_t replication) { - if (replication < 1 || replication > 512) { - std::stringstream errormsg; - errormsg << "CheckValidReplication: argument 'replication' is " - << replication << " (should be between 1 and 512)"; - return Status::InvalidArgument(errormsg.str().c_str()); - } - return Status::OK(); -} - void NameNodeOperations::Connect(const std::string &cluster_name, const std::vector &servers, std::function &&handler) { @@ -170,7 +146,7 @@ void NameNodeOperations::SetReplication(const std::string & path, int16_t replic handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); return; } - Status replStatus = CheckValidReplication(replication); + Status replStatus = FileSystemImpl::CheckValidReplication(replication); if (!replStatus.ok()) { handler(replStatus); return; @@ -252,7 +228,8 @@ void NameNodeOperations::GetFileInfo(const std::string & path, // no fs in the protobuf. if(resp -> has_fs()){ struct StatInfo stat_info; - stat_info.path=path; + stat_info.path = path; + stat_info.full_path = path; HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); handler(stat, stat_info); } else { @@ -290,7 +267,7 @@ void NameNodeOperations::GetFsStats( void NameNodeOperations::GetListing( const std::string & path, - std::function> &, bool)> handler, + std::function &, bool)> handler, const std::string & start_after) { using ::hadoop::hdfs::GetListingRequestProto; using ::hadoop::hdfs::GetListingResponseProto; @@ -300,8 +277,8 @@ void NameNodeOperations::GetListing( << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); if (path.empty()) { - std::shared_ptr> stat_infos; - handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), stat_infos, false); + std::vector empty; + handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), empty, false); return; } @@ -312,31 +289,26 @@ void NameNodeOperations::GetListing( auto resp = std::make_shared(); - namenode_.GetListing( - &req, - resp, - [resp, handler, path](const Status &stat) { - if (stat.ok()) { - if(resp -> has_dirlist()){ - std::shared_ptr> stat_infos(new std::vector); - for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { - StatInfo si; - si.path=fs.path(); - HdfsFileStatusProtoToStatInfo(si, fs); - stat_infos->push_back(si); - } - handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); - } else { - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - std::shared_ptr> stat_infos; - handler(statNew, stat_infos, false); - } - } else { - std::shared_ptr> stat_infos; - handler(stat, stat_infos, false); + namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) { + std::vector stat_infos; + if (stat.ok()) { + if(resp -> has_dirlist()){ + for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { + StatInfo si; + si.path = fs.path(); + si.full_path = path + fs.path() + "/"; + HdfsFileStatusProtoToStatInfo(si, fs); + stat_infos.push_back(si); } - }); + handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); + } else { + std::string errormsg = "No such file or directory: " + path; + handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false); + } + } else { + handler(stat, stat_infos, false); + } + }); } void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, @@ -355,7 +327,7 @@ void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, } MkdirsRequestProto req; - Status permStatus = CheckValidPermissionMask(permissions); + Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); if (!permStatus.ok()) { handler(permStatus); return; @@ -471,7 +443,7 @@ void NameNodeOperations::SetPermission(const std::string & path, handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); return; } - Status permStatus = CheckValidPermissionMask(permissions); + Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); if (!permStatus.ok()) { handler(permStatus); return; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index 3afa2e9a1c..60efacc902 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -48,12 +48,6 @@ public: engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), namenode_(& engine_), options_(options) {} - static uint16_t GetDefaultPermissionMask(); - - static Status CheckValidPermissionMask(uint16_t permissions); - - static Status CheckValidReplication(uint16_t replication); - void Connect(const std::string &cluster_name, const std::vector &servers, std::function &&handler); @@ -77,7 +71,7 @@ public: // start_after="" for initial call void GetListing(const std::string & path, - std::function>&, bool)> handler, + std::function &, bool)> handler, const std::string & start_after = ""); void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc index 4d497283fc..9534204c92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc @@ -33,21 +33,27 @@ TEST(ConfigurationTest, TestDegenerateInputs) { /* Completely empty stream */ { std::stringstream stream; - optional config = ConfigurationLoader().Load(""); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(""); EXPECT_FALSE(config && "Empty stream"); } /* No values */ { std::string data = ""; - optional config = ConfigurationLoader().Load(data); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(data); EXPECT_TRUE(config && "Blank config"); } /* Extraneous values */ { std::string data = ""; - optional config = ConfigurationLoader().Load(data); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(data); EXPECT_TRUE(config && "Extraneous values"); } } @@ -57,7 +63,9 @@ TEST(ConfigurationTest, TestBasicOperations) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -74,7 +82,9 @@ TEST(ConfigurationTest, TestBasicOperations) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); EXPECT_EQ("value1", config->GetWithDefault("KEY1", "")); } @@ -83,7 +93,9 @@ TEST(ConfigurationTest, TestBasicOperations) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional value = config->Get("key1"); EXPECT_TRUE((bool)value); @@ -97,7 +109,9 @@ TEST(ConfigurationTest, TestCompactValues) { std::stringstream stream; stream << ""; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Compact value parse"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -108,7 +122,9 @@ TEST(ConfigurationTest, TestMultipleResources) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -129,7 +145,9 @@ TEST(ConfigurationTest, TestStringResource) { simpleConfigStream(stream, "key1", "value1"); std::string str = stream.str(); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -171,7 +189,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "key1value1false"; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -187,7 +207,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "key1value1true"; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -203,7 +225,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << ""; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -219,7 +243,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << ""; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -235,7 +261,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "key1value1spam"; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -251,7 +279,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "key1value1"; - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -271,7 +301,9 @@ TEST(ConfigurationTest, TestFileReads) TempFile tempFile; writeSimpleConfig(tempFile.filename, "key1", "value1"); - optional config = ConfigurationLoader().LoadFromFile(tempFile.filename); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.LoadFromFile(tempFile.filename); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -298,7 +330,9 @@ TEST(ConfigurationTest, TestFileReads) { TempDir tempDir; - optional config = ConfigurationLoader().LoadFromFile(tempDir.path); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.LoadFromFile(tempDir.path); EXPECT_FALSE(config && "Add directory as file resource"); } @@ -359,7 +393,9 @@ TEST(ConfigurationTest, TestIntConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "1"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional value = config->GetInt("key1"); EXPECT_TRUE((bool)value); @@ -398,7 +434,9 @@ TEST(ConfigurationTest, TestDoubleConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "1"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional value = config->GetDouble("key1"); EXPECT_TRUE((bool)value); @@ -441,7 +479,9 @@ TEST(ConfigurationTest, TestBoolConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "true"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional value = config->GetBool("key1"); EXPECT_TRUE((bool)value); @@ -488,7 +528,9 @@ TEST(ConfigurationTest, TestUriConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "hdfs:///"); - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional value = config->GetUri("key1"); EXPECT_TRUE((bool)value); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h index 669557f0f3..7947ff5e2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h @@ -54,7 +54,9 @@ template optional simpleConfig(Args... args) { std::stringstream stream; simpleConfigStream(stream, args...); - optional parse = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional parse = config_loader.Load(stream.str()); EXPECT_TRUE((bool)parse); return parse; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc index 7e9ca666cc..360f886c12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc @@ -30,7 +30,9 @@ TEST(HdfsConfigurationTest, TestDefaultOptions) { // Completely empty stream { - HdfsConfiguration empty_config = ConfigurationLoader().New(); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + HdfsConfiguration empty_config = config_loader.NewConfig(); Options options = empty_config.GetOptions(); EXPECT_EQ(Options::kDefaultRpcTimeout, options.rpc_timeout); } @@ -49,8 +51,9 @@ TEST(HdfsConfigurationTest, TestSetOptions) HdfsConfiguration::kIpcClientConnectTimeoutKey, 103, HdfsConfiguration::kHadoopSecurityAuthenticationKey, HdfsConfiguration::kHadoopSecurityAuthentication_kerberos ); - - optional config = ConfigurationLoader().Load(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional config = config_loader.Load(stream.str()); EXPECT_TRUE(config && "Read stream"); Options options = config->GetOptions(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index fd82da310a..ddba67f404 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -492,6 +492,10 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations) { return libhdfspp_hdfsFreeBlockLocations(locations); } +hdfsFileInfo *hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t *numEntries) { + return (hdfsFileInfo *)libhdfspp_hdfsFind(fs->libhdfsppRep, path, name, numEntries); +} + int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { return libhdfspp_hdfsCreateSnapshot(fs->libhdfsppRep, path, name); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h index 481ed680f4..644ff13b88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h @@ -94,6 +94,7 @@ #undef hdfsCancel #undef hdfsGetBlockLocations #undef hdfsFreeBlockLocations +#undef hdfsFind #undef hdfsCreateSnapshot #undef hdfsDeleteSnapshot #undef hdfsAllowSnapshot diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h index a1e4483489..c186d63d60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h @@ -94,6 +94,7 @@ #define hdfsCancel libhdfspp_hdfsCancel #define hdfsGetBlockLocations libhdfspp_hdfsGetBlockLocations #define hdfsFreeBlockLocations libhdfspp_hdfsFreeBlockLocations +#define hdfsFind libhdfspp_hdfsFind #define hdfsCreateSnapshot libhdfspp_hdfsCreateSnapshot #define hdfsDeleteSnapshot libhdfspp_hdfsDeleteSnapshot #define hdfsAllowSnapshot libhdfspp_hdfsAllowSnapshot diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt new file mode 100644 index 0000000000..f0817ebfd6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default LIBHDFSPP_DIR to the default install location. You can override +# it by add -DLIBHDFSPP_DIR=... to your cmake invocation +set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) + +include_directories( ${LIBHDFSPP_DIR}/include ) +link_directories( ${LIBHDFSPP_DIR}/lib ) + +add_library(tools_common_obj OBJECT tools_common.cpp) +add_library(tools_common $) + +add_executable(hdfs_cat hdfs_cat.cpp) +target_link_libraries(hdfs_cat tools_common hdfspp) + +add_executable(hdfs_chgrp hdfs_chgrp.cpp) +target_link_libraries(hdfs_chgrp tools_common hdfspp) + +add_executable(hdfs_chown hdfs_chown.cpp) +target_link_libraries(hdfs_chown tools_common hdfspp) + +add_executable(hdfs_chmod hdfs_chmod.cpp) +target_link_libraries(hdfs_chmod tools_common hdfspp) + +add_executable(hdfs_find hdfs_find.cpp) +target_link_libraries(hdfs_find tools_common hdfspp) \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_cat.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_cat.cpp new file mode 100644 index 0000000000..166a7bfef1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_cat.cpp @@ -0,0 +1,120 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include +#include "tools_common.h" + +void usage(){ + std::cout << "Usage: hdfs_cat [OPTION] FILE" + << std::endl + << std::endl << "Concatenate FILE to standard output." + << std::endl + << std::endl << " -h display this help and exit" + << std::endl + << std::endl << "Examples:" + << std::endl << "hdfs_cat hdfs://localhost.localdomain:9433/dir/file" + << std::endl << "hdfs_cat /dir/file" + << std::endl; +} + +#define BUF_SIZE 4096 + +int main(int argc, char *argv[]) { + if (argc != 2) { + usage(); + exit(EXIT_FAILURE); + } + + int input; + + //Using GetOpt to read in the values + opterr = 0; + while ((input = getopt(argc, argv, "h")) != -1) { + switch (input) + { + case 'h': + usage(); + exit(EXIT_SUCCESS); + break; + case '?': + if (isprint(optopt)) + std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl; + else + std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl; + usage(); + exit(EXIT_FAILURE); + default: + exit(EXIT_FAILURE); + } + } + + std::string uri_path = argv[optind]; + + //Building a URI object from the given uri_path + hdfs::optional uri = hdfs::URI::parse_from_string(uri_path); + if (!uri) { + std::cerr << "Malformed URI: " << uri_path << std::endl; + exit(EXIT_FAILURE); + } + + //TODO: HDFS-9539 Currently options can be returned empty + hdfs::Options options = *hdfs::getOptions(); + + std::shared_ptr fs = hdfs::doConnect(uri.value(), options); + if (!fs) { + std::cerr << "Could not connect the file system. " << std::endl; + exit(EXIT_FAILURE); + } + + hdfs::FileHandle *file_raw = nullptr; + hdfs::Status status = fs->Open(uri->get_path(), &file_raw); + if (!status.ok()) { + std::cerr << "Could not open file " << uri->get_path() << ". " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + //wrapping file_raw into a unique pointer to guarantee deletion + std::unique_ptr file(file_raw); + + char input_buffer[BUF_SIZE]; + ssize_t total_bytes_read = 0; + size_t last_bytes_read = 0; + + do{ + //Reading file chunks + status = file->Read(input_buffer, sizeof(input_buffer), &last_bytes_read); + if(status.ok()) { + //Writing file chunks to stdout + fwrite(input_buffer, last_bytes_read, 1, stdout); + total_bytes_read += last_bytes_read; + } else { + if(status.is_invalid_offset()){ + //Reached the end of the file + break; + } else { + std::cerr << "Error reading the file: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + } + } while (last_bytes_read > 0); + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chgrp.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chgrp.cpp new file mode 100644 index 0000000000..2bb6843c3d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chgrp.cpp @@ -0,0 +1,196 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include +#include +#include "tools_common.h" + +void usage(){ + std::cout << "Usage: hdfs_chgrp [OPTION] GROUP FILE" + << std::endl + << std::endl << "Change the group association of each FILE to GROUP." + << std::endl << "The user must be the owner of files. Additional information is in the Permissions Guide:" + << std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html" + << std::endl + << std::endl << " -R operate on files and directories recursively" + << std::endl << " -h display this help and exit" + << std::endl + << std::endl << "Examples:" + << std::endl << "hdfs_chgrp -R new_group hdfs://localhost.localdomain:9433/dir/file" + << std::endl << "hdfs_chgrp new_group /dir/file" + << std::endl; +} + +struct SetOwnerState { + const std::string username; + const std::string groupname; + const std::function handler; + //The request counter is incremented once every time SetOwner async call is made + uint64_t request_counter; + //This boolean will be set when find returns the last result + bool find_is_done; + //Final status to be returned + hdfs::Status status; + //Shared variables will need protection with a lock + std::mutex lock; + SetOwnerState(const std::string & username_, const std::string & groupname_, + const std::function & handler_, + uint64_t request_counter_, bool find_is_done_) + : username(username_), + groupname(groupname_), + handler(handler_), + request_counter(request_counter_), + find_is_done(find_is_done_), + status(), + lock() { + } +}; + +int main(int argc, char *argv[]) { + //We should have 3 or 4 parameters + if (argc != 3 && argc != 4) { + usage(); + exit(EXIT_FAILURE); + } + + bool recursive = false; + int input; + + //Using GetOpt to read in the values + opterr = 0; + while ((input = getopt(argc, argv, "Rh")) != -1) { + switch (input) + { + case 'R': + recursive = 1; + break; + case 'h': + usage(); + exit(EXIT_SUCCESS); + break; + case '?': + if (isprint(optopt)) + std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl; + else + std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl; + usage(); + exit(EXIT_FAILURE); + default: + exit(EXIT_FAILURE); + } + } + std::string group = argv[optind]; + //Owner stays the same, just group association changes. + std::string owner = ""; + std::string uri_path = argv[optind + 1]; + + //Building a URI object from the given uri_path + hdfs::optional uri = hdfs::URI::parse_from_string(uri_path); + if (!uri) { + std::cerr << "Malformed URI: " << uri_path << std::endl; + exit(EXIT_FAILURE); + } + + //TODO: HDFS-9539 Currently options can be returned empty + hdfs::Options options = *hdfs::getOptions(); + + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits::max(); + + std::shared_ptr fs = hdfs::doConnect(uri.value(), options); + if (!fs) { + std::cerr << "Could not connect the file system. " << std::endl; + exit(EXIT_FAILURE); + } + + /* wrap async FileSystem::SetOwner with promise to make it a blocking call */ + std::shared_ptr> promise = std::make_shared>(); + std::future future(promise->get_future()); + auto handler = [promise](const hdfs::Status &s) { + promise->set_value(s); + }; + + if(!recursive){ + fs->SetOwner(uri->get_path(), owner, group, handler); + } + else { + //Allocating shared state, which includes: + //username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done + std::shared_ptr state = std::make_shared(owner, group, handler, 0, false); + + // Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0. + // Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind. + auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector & stat_infos, bool has_more_results) -> bool { + + //For each result returned by Find we call async SetOwner with the handler below. + //SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner. + auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) { + std::lock_guard guard(state->lock); + + //Decrement the counter once since we are done with this async call + if (!status_set_owner.ok() && state->status.ok()){ + //We make sure we set state->status only on the first error. + state->status = status_set_owner; + } + state->request_counter--; + if(state->request_counter == 0 && state->find_is_done){ + state->handler(state->status); //exit + } + }; + if(!stat_infos.empty() && state->status.ok()) { + for (hdfs::StatInfo const& s : stat_infos) { + //Launch an asynchronous call to SetOwner for every returned result + state->request_counter++; + fs->SetOwner(s.full_path, state->username, state->groupname, handlerSetOwner); + } + } + + //Lock this section because handlerSetOwner might be accessing the same + //shared variables simultaneously + std::lock_guard guard(state->lock); + if (!status_find.ok() && state->status.ok()){ + //We make sure we set state->status only on the first error. + state->status = status_find; + } + if(!has_more_results){ + state->find_is_done = true; + if(state->request_counter == 0){ + state->handler(state->status); //exit + } + return false; + } + return true; + }; + + //Asynchronous call to Find + fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind); + } + + /* block until promise is set */ + hdfs::Status status = future.get(); + if (!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chmod.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chmod.cpp new file mode 100644 index 0000000000..0a001d6f75 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chmod.cpp @@ -0,0 +1,194 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include +#include +#include "tools_common.h" + +void usage(){ + std::cout << "Usage: hdfs_chmod [OPTION] FILE" + << std::endl + << std::endl << "Change the permissions of each FILE to MODE." + << std::endl << "The user must be the owner of the file, or else a super-user." + << std::endl << "Additional information is in the Permissions Guide:" + << std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html" + << std::endl + << std::endl << " -R operate on files and directories recursively" + << std::endl << " -h display this help and exit" + << std::endl + << std::endl << "Examples:" + << std::endl << "hdfs_chmod -R 755 hdfs://localhost.localdomain:9433/dir/file" + << std::endl << "hdfs_chmod 777 /dir/file" + << std::endl; +} + +struct SetPermissionState { + const uint16_t permissions; + const std::function handler; + //The request counter is incremented once every time SetOwner async call is made + uint64_t request_counter; + //This boolean will be set when find returns the last result + bool find_is_done; + //Final status to be returned + hdfs::Status status; + //Shared variables will need protection with a lock + std::mutex lock; + SetPermissionState(const uint16_t permissions_, const std::function & handler_, + uint64_t request_counter_, bool find_is_done_) + : permissions(permissions_), + handler(handler_), + request_counter(request_counter_), + find_is_done(find_is_done_), + status(), + lock() { + } +}; + +int main(int argc, char *argv[]) { + //We should have 3 or 4 parameters + if (argc != 3 && argc != 4) { + usage(); + exit(EXIT_FAILURE); + } + + bool recursive = false; + int input; + + //Using GetOpt to read in the values + opterr = 0; + while ((input = getopt(argc, argv, "Rh")) != -1) { + switch (input) + { + case 'R': + recursive = 1; + break; + case 'h': + usage(); + exit(EXIT_SUCCESS); + break; + case '?': + if (isprint(optopt)) + std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl; + else + std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl; + usage(); + exit(EXIT_FAILURE); + default: + exit(EXIT_FAILURE); + } + } + std::string permissions = argv[optind]; + std::string uri_path = argv[optind + 1]; + + //Building a URI object from the given uri_path + hdfs::optional uri = hdfs::URI::parse_from_string(uri_path); + if (!uri) { + std::cerr << "Malformed URI: " << uri_path << std::endl; + exit(EXIT_FAILURE); + } + + //TODO: HDFS-9539 Currently options can be returned empty + hdfs::Options options = *hdfs::getOptions(); + + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits::max(); + + std::shared_ptr fs = hdfs::doConnect(uri.value(), options); + if (!fs) { + std::cerr << "Could not connect the file system. " << std::endl; + exit(EXIT_FAILURE); + } + + /* wrap async FileSystem::SetPermission with promise to make it a blocking call */ + std::shared_ptr> promise = std::make_shared>(); + std::future future(promise->get_future()); + auto handler = [promise](const hdfs::Status &s) { + promise->set_value(s); + }; + + //strtol() is reading the value with base 8, NULL because we are reading in just one value. + uint16_t perm = strtol(permissions.c_str(), NULL, 8); + if(!recursive){ + fs->SetPermission(uri->get_path(), perm, handler); + } + else { + //Allocating shared state, which includes: + //username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done + std::shared_ptr state = std::make_shared(perm, handler, 0, false); + + // Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0. + // Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind. + auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector & stat_infos, bool has_more_results) -> bool { + + //For each result returned by Find we call async SetOwner with the handler below. + //SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner. + auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) { + std::lock_guard guard(state->lock); + + //Decrement the counter once since we are done with this async call + if (!status_set_owner.ok() && state->status.ok()){ + //We make sure we set state->status only on the first error. + state->status = status_set_owner; + } + state->request_counter--; + if(state->request_counter == 0 && state->find_is_done){ + state->handler(state->status); //exit + } + }; + if(!stat_infos.empty() && state->status.ok()) { + for (hdfs::StatInfo const& s : stat_infos) { + //Launch an asynchronous call to SetOwner for every returned result + state->request_counter++; + fs->SetPermission(s.full_path, state->permissions, handlerSetOwner); + } + } + + //Lock this section because handlerSetOwner might be accessing the same + //shared variables simultaneously + std::lock_guard guard(state->lock); + if (!status_find.ok() && state->status.ok()){ + //We make sure we set state->status only on the first error. + state->status = status_find; + } + if(!has_more_results){ + state->find_is_done = true; + if(state->request_counter == 0){ + state->handler(state->status); //exit + } + return false; + } + return true; + }; + + //Asynchronous call to Find + fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind); + } + + /* block until promise is set */ + hdfs::Status status = future.get(); + if (!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chown.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chown.cpp new file mode 100644 index 0000000000..08724c6f84 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_chown.cpp @@ -0,0 +1,206 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include +#include +#include "tools_common.h" + +void usage(){ + std::cout << "Usage: hdfs_chown [OPTION] [OWNER][:[GROUP]] FILE" + << std::endl + << std::endl << "Change the owner and/or group of each FILE to OWNER and/or GROUP." + << std::endl << "The user must be a super-user. Additional information is in the Permissions Guide:" + << std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html" + << std::endl + << std::endl << " -R operate on files and directories recursively" + << std::endl << " -h display this help and exit" + << std::endl + << std::endl << "Owner is unchanged if missing. Group is unchanged if missing." + << std::endl << "OWNER and GROUP may be numeric as well as symbolic." + << std::endl + << std::endl << "Examples:" + << std::endl << "hdfs_chown -R new_owner:new_group hdfs://localhost.localdomain:9433/dir/file" + << std::endl << "hdfs_chown new_owner /dir/file" + << std::endl; +} + +struct SetOwnerState { + const std::string username; + const std::string groupname; + const std::function handler; + //The request counter is incremented once every time SetOwner async call is made + uint64_t request_counter; + //This boolean will be set when find returns the last result + bool find_is_done; + //Final status to be returned + hdfs::Status status; + //Shared variables will need protection with a lock + std::mutex lock; + SetOwnerState(const std::string & username_, const std::string & groupname_, + const std::function & handler_, + uint64_t request_counter_, bool find_is_done_) + : username(username_), + groupname(groupname_), + handler(handler_), + request_counter(request_counter_), + find_is_done(find_is_done_), + status(), + lock() { + } +}; + +int main(int argc, char *argv[]) { + //We should have 3 or 4 parameters + if (argc != 3 && argc != 4) { + usage(); + exit(EXIT_FAILURE); + } + + bool recursive = false; + int input; + + //Using GetOpt to read in the values + opterr = 0; + while ((input = getopt(argc, argv, "Rh")) != -1) { + switch (input) + { + case 'R': + recursive = 1; + break; + case 'h': + usage(); + exit(EXIT_SUCCESS); + break; + case '?': + if (isprint(optopt)) + std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl; + else + std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl; + usage(); + exit(EXIT_FAILURE); + default: + exit(EXIT_FAILURE); + } + } + std::string owner_and_group = argv[optind]; + std::string uri_path = argv[optind + 1]; + + std::string owner, group; + size_t owner_end = owner_and_group.find(":"); + if(owner_end == std::string::npos) { + owner = owner_and_group; + } else { + owner = owner_and_group.substr(0, owner_end); + group = owner_and_group.substr(owner_end + 1); + } + + //Building a URI object from the given uri_path + hdfs::optional uri = hdfs::URI::parse_from_string(uri_path); + if (!uri) { + std::cerr << "Malformed URI: " << uri_path << std::endl; + exit(EXIT_FAILURE); + } + + //TODO: HDFS-9539 Currently options can be returned empty + hdfs::Options options = *hdfs::getOptions(); + + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits::max(); + + std::shared_ptr fs = hdfs::doConnect(uri.value(), options); + if (!fs) { + std::cerr << "Could not connect the file system. " << std::endl; + exit(EXIT_FAILURE); + } + + /* wrap async FileSystem::SetOwner with promise to make it a blocking call */ + std::shared_ptr> promise = std::make_shared>(); + std::future future(promise->get_future()); + auto handler = [promise](const hdfs::Status &s) { + promise->set_value(s); + }; + + if(!recursive){ + fs->SetOwner(uri->get_path(), owner, group, handler); + } + else { + //Allocating shared state, which includes: + //username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done + std::shared_ptr state = std::make_shared(owner, group, handler, 0, false); + + // Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0. + // Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind. + auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector & stat_infos, bool has_more_results) -> bool { + + //For each result returned by Find we call async SetOwner with the handler below. + //SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner. + auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) { + std::lock_guard guard(state->lock); + + //Decrement the counter once since we are done with this async call + if (!status_set_owner.ok() && state->status.ok()){ + //We make sure we set state->status only on the first error. + state->status = status_set_owner; + } + state->request_counter--; + if(state->request_counter == 0 && state->find_is_done){ + state->handler(state->status); //exit + } + }; + if(!stat_infos.empty() && state->status.ok()) { + for (hdfs::StatInfo const& s : stat_infos) { + //Launch an asynchronous call to SetOwner for every returned result + state->request_counter++; + fs->SetOwner(s.full_path, state->username, state->groupname, handlerSetOwner); + } + } + + //Lock this section because handlerSetOwner might be accessing the same + //shared variables simultaneously + std::lock_guard guard(state->lock); + if (!status_find.ok() && state->status.ok()){ + //We make sure we set state->status only on the first error. + state->status = status_find; + } + if(!has_more_results){ + state->find_is_done = true; + if(state->request_counter == 0){ + state->handler(state->status); //exit + } + return false; + } + return true; + }; + + //Asynchronous call to Find + fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind); + } + + /* block until promise is set */ + hdfs::Status status = future.get(); + if (!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_find.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_find.cpp new file mode 100644 index 0000000000..eca79c6ce2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_find.cpp @@ -0,0 +1,156 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include +#include +#include +#include "tools_common.h" + +void usage(){ + std::cout << "Usage: hdfs_find [OPTION] PATH" + << std::endl + << std::endl << "Finds all files recursively starting from the" + << std::endl << "specified PATH and prints their file paths." + << std::endl << "This hdfs_find tool mimics the POSIX find." + << std::endl + << std::endl << "Both PATH and NAME can have wild-cards." + << std::endl + << std::endl << " -n NAME if provided all results will be matching the NAME pattern" + << std::endl << " otherwise, the implicit '*' will be used" + << std::endl << " NAME allows wild-cards" + << std::endl + << std::endl << " -m MAX_DEPTH if provided the maximum depth to recurse after the end of" + << std::endl << " the path is reached will be limited by MAX_DEPTH" + << std::endl << " otherwise, the maximum depth to recurse is unbound" + << std::endl << " MAX_DEPTH can be set to 0 for pure globbing and ignoring" + << std::endl << " the NAME option (no recursion after the end of the path)" + << std::endl + << std::endl << " -h display this help and exit" + << std::endl + << std::endl << "Examples:" + << std::endl << "hdfs_find hdfs://localhost.localdomain:9433/dir?/tree* -n some?file*name" + << std::endl << "hdfs_find / -n file_name -m 3" + << std::endl; +} + +int main(int argc, char *argv[]) { + //We should have at least 2 arguments + if (argc < 2) { + usage(); + exit(EXIT_FAILURE); + } + + int input; + //If NAME is not specified we use implicit "*" + std::string name = "*"; + //If MAX_DEPTH is not specified we use the max value of uint_32_t + uint32_t max_depth = hdfs::FileSystem::GetDefaultFindMaxDepth(); + + //Using GetOpt to read in the values + opterr = 0; + while ((input = getopt(argc, argv, "hn:m:")) != -1) { + switch (input) + { + case 'h': + usage(); + exit(EXIT_SUCCESS); + break; + case 'n': + name = optarg; + break; + case 'm': + max_depth = std::stoi(optarg); + break; + case '?': + if (optopt == 'n' || optopt == 'm') + std::cerr << "Option -" << (char) optopt << " requires an argument." << std::endl; + else if (isprint(optopt)) + std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl; + else + std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl; + usage(); + exit(EXIT_FAILURE); + default: + exit(EXIT_FAILURE); + } + } + std::string uri_path = argv[optind]; + + //Building a URI object from the given uri_path + hdfs::optional uri = hdfs::URI::parse_from_string(uri_path); + if (!uri) { + std::cerr << "Malformed URI: " << uri_path << std::endl; + exit(EXIT_FAILURE); + } + + //TODO: HDFS-9539 Currently options can be returned empty + hdfs::Options options = *hdfs::getOptions(); + + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits::max(); + + std::shared_ptr fs = hdfs::doConnect(uri.value(), options); + if (!fs) { + std::cerr << "Could not connect the file system. " << std::endl; + exit(EXIT_FAILURE); + } + + std::promise promise; + std::future future(promise.get_future()); + hdfs::Status status = hdfs::Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto handler = [&promise, &status] + (const hdfs::Status &s, const std::vector & si, bool has_more_results) -> bool { + //Print result chunks as they arrive + if(!si.empty()) { + for (hdfs::StatInfo const& s : si) { + std::cout << s.full_path << std::endl; + } + } + if(!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); //set promise + return false; //request stop sending results + } + return true; //request more results + }; + + //Asynchronous call to Find + fs->Find(uri->get_path(), name, max_depth, handler); + + //block until promise is set + future.get(); + if(!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + } + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp new file mode 100644 index 0000000000..af882ce840 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp @@ -0,0 +1,70 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include "tools_common.h" + +namespace hdfs { + + std::shared_ptr getOptions() { + std::shared_ptr options = std::make_shared(); + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional config = loader.LoadDefaultResources(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. + if(config){ + //Loading options from the config + *options = config->GetOptions(); + } + return options; + } + + std::shared_ptr doConnect(hdfs::URI & uri, hdfs::Options & options) { + IoService * io_service = IoService::New(); + //Wrapping fs into a shared pointer to guarantee deletion + std::shared_ptr fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not create FileSystem object. " << std::endl; + exit(EXIT_FAILURE); + } + Status status; + //Check if the user supplied the host + if(!uri.get_host().empty()){ + //If port is supplied we use it, otherwise we use the empty string so that it will be looked up in configs. + std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : ""; + status = fs->Connect(uri.get_host(), port); + if (!status.ok()) { + std::cerr << "Could not connect to " << uri.get_host() << ":" << port << ". " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + } else { + status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); + } + } + return fs; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h new file mode 100644 index 0000000000..858fc4b8ed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h @@ -0,0 +1,39 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#ifndef TOOLS_COMMON_H_ +#define TOOLS_COMMON_H_ + +#include "hdfspp/hdfspp.h" +#include "common/hdfs_configuration.h" +#include "common/configuration_loader.h" + +#include + +namespace hdfs { + + //Pull configurations and get the Options object + std::shared_ptr getOptions(); + + //Build all necessary objects and perform the connection + std::shared_ptr doConnect(hdfs::URI & uri, hdfs::Options & options); + +} + +#endif