diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/CMakeLists.txt index 75c5ad1ff9..bec9343903 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/CMakeLists.txt @@ -38,6 +38,7 @@ add_executable(hdfs_tool_tests hdfs-get-mock.cc hdfs-find-mock.cc hdfs-ls-mock.cc + hdfs-setrep-mock.cc main.cc) target_include_directories(hdfs_tool_tests PRIVATE ../tools @@ -60,6 +61,7 @@ target_include_directories(hdfs_tool_tests PRIVATE ../../tools/hdfs-get ../../tools/hdfs-find ../../tools/hdfs-ls + ../../tools/hdfs-setrep ../../tools/hdfs-cat) target_link_libraries(hdfs_tool_tests PRIVATE gmock_main @@ -81,5 +83,6 @@ target_link_libraries(hdfs_tool_tests PRIVATE hdfs_get_lib hdfs_find_lib hdfs_ls_lib + hdfs_setrep_lib hdfs_cat_lib) add_test(hdfs_tool_tests hdfs_tool_tests) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.cc new file mode 100644 index 0000000000..d33f49b6ae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.cc @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include + +#include "hdfs-setrep-mock.h" +#include "hdfs-tool-tests.h" + +namespace hdfs::tools::test { +SetrepMock::~SetrepMock() = default; + +void SetrepMock::SetExpectations( + std::function()> test_case, + const std::vector &args) const { + // Get the pointer to the function that defines the test case + const auto test_case_func = + test_case.target (*)()>(); + ASSERT_NE(test_case_func, nullptr); + + // Set the expected method calls and their corresponding arguments for each + // test case + if (*test_case_func == &CallHelp) { + EXPECT_CALL(*this, HandleHelp()).Times(1).WillOnce(testing::Return(true)); + return; + } + + if (*test_case_func == &PassPermissionsAndAPath) { + const auto number = args[0]; + const auto path = args[1]; + EXPECT_CALL(*this, HandlePath(path, number)) + .Times(1) + .WillOnce(testing::Return(true)); + } +} +} // namespace hdfs::tools::test diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.h new file mode 100644 index 0000000000..db1e0960ae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.h @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_TOOLS_HDFS_SETREP_MOCK +#define LIBHDFSPP_TOOLS_HDFS_SETREP_MOCK + +#include +#include +#include +#include + +#include + +#include "hdfs-setrep.h" + +namespace hdfs::tools::test { +/** + * {@class SetrepMock} is an {@class Setrep} whereby it mocks the + * HandleHelp and HandlePath methods for testing their functionality. + */ +class SetrepMock : public hdfs::tools::Setrep { +public: + /** + * {@inheritdoc} + */ + SetrepMock(const int argc, char **argv) : Setrep(argc, argv) {} + + // Abiding to the Rule of 5 + SetrepMock(const SetrepMock &) = delete; + SetrepMock(SetrepMock &&) = delete; + SetrepMock &operator=(const SetrepMock &) = delete; + SetrepMock &operator=(SetrepMock &&) = delete; + ~SetrepMock() override; + + /** + * Defines the methods and the corresponding arguments that are expected + * to be called on this instance of {@link HdfsTool} for the given test case. + * + * @param test_case An {@link std::function} object that points to the + * function defining the test case + * @param args The arguments that are passed to this test case + */ + void SetExpectations(std::function()> test_case, + const std::vector &args = {}) const; + + MOCK_METHOD(bool, HandleHelp, (), (const, override)); + + MOCK_METHOD(bool, HandlePath, (const std::string &, const std::string &), + (const, override)); +}; +} // namespace hdfs::tools::test + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-tool-tests.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-tool-tests.cc index 97169293f4..1bdf82f9af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-tool-tests.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-tool-tests.cc @@ -38,6 +38,7 @@ #include "hdfs-move-to-local-mock.h" #include "hdfs-rename-snapshot-mock.h" #include "hdfs-rm-mock.h" +#include "hdfs-setrep-mock.h" #include "hdfs-tool-test-fixtures.h" #include "hdfs-tool-tests.h" @@ -156,6 +157,11 @@ INSTANTIATE_TEST_SUITE_P( PassMOptPermissionsAndAPath, PassNOptAndAPath)); +INSTANTIATE_TEST_SUITE_P( + HdfsSetrep, HdfsToolBasicTest, + testing::Values(CallHelp, + PassPermissionsAndAPath)); + // Negative tests INSTANTIATE_TEST_SUITE_P( HdfsAllowSnapshot, HdfsToolNegativeTestThrows, @@ -245,6 +251,20 @@ INSTANTIATE_TEST_SUITE_P( PassMOpt, PassNOpt)); +INSTANTIATE_TEST_SUITE_P( + HdfsChgrp, HdfsToolNegativeTestThrows, + testing::Values(PassNOptAndAPath)); + +INSTANTIATE_TEST_SUITE_P( + HdfsSetrep, HdfsToolNegativeTestThrows, + testing::Values( + Pass3Paths, + PassRecursiveOwnerAndAPath, + PassRecursive, + PassMPOptsPermissionsAndAPath, + PassMOpt, + PassNOpt)); + INSTANTIATE_TEST_SUITE_P( HdfsRm, HdfsToolNegativeTestNoThrow, testing::Values(PassRecursive)); @@ -302,5 +322,5 @@ INSTANTIATE_TEST_SUITE_P( testing::Values(PassAPath)); INSTANTIATE_TEST_SUITE_P( - HdfsChgrp, HdfsToolNegativeTestThrows, - testing::Values(PassNOptAndAPath)); + HdfsSetrep, HdfsToolNegativeTestNoThrow, + testing::Values(PassAPath)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt index 7cbbe49b55..0d9a684c8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/CMakeLists.txt @@ -64,8 +64,7 @@ add_subdirectory(hdfs-copy-to-local) add_subdirectory(hdfs-move-to-local) -add_executable(hdfs_setrep hdfs_setrep.cc) -target_link_libraries(hdfs_setrep tools_common hdfspp_static) +add_subdirectory(hdfs-setrep) add_subdirectory(hdfs-allow-snapshot) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/CMakeLists.txt new file mode 100644 index 0000000000..a0d8bafa63 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/CMakeLists.txt @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(hdfs_setrep_lib STATIC $ hdfs-setrep.cc) +target_include_directories(hdfs_setrep_lib PRIVATE ../../tools ${Boost_INCLUDE_DIRS}) +target_link_libraries(hdfs_setrep_lib PRIVATE Boost::boost Boost::program_options tools_common hdfspp_static) + +add_executable(hdfs_setrep main.cc) +target_include_directories(hdfs_setrep PRIVATE ../../tools) +target_link_libraries(hdfs_setrep PRIVATE hdfs_setrep_lib) + +install(TARGETS hdfs_setrep RUNTIME DESTINATION bin) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.cc new file mode 100644 index 0000000000..542659b29f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.cc @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "hdfs-setrep.h" +#include "internal/set-replication-state.h" +#include "tools_common.h" + +namespace hdfs::tools { +Setrep::Setrep(const int argc, char **argv) : HdfsTool(argc, argv) {} + +bool Setrep::Initialize() { + auto add_options = opt_desc_.add_options(); + add_options("help,h", + "Changes the replication factor of a file at PATH. If PATH is a " + "directory then the command recursively changes the replication " + "factor of all files under the directory tree rooted at PATH."); + add_options( + "replication-factor", po::value(), + "The replication factor to set for the given path and its children."); + add_options("path", po::value(), + "The path for which the replication factor needs to be set."); + + // We allow only one positional argument to be passed to this tool. An + // exception is thrown if multiple arguments are passed. + pos_opt_desc_.add("replication-factor", 1); + pos_opt_desc_.add("path", 1); + + po::store(po::command_line_parser(argc_, argv_) + .options(opt_desc_) + .positional(pos_opt_desc_) + .run(), + opt_val_); + po::notify(opt_val_); + return true; +} + +bool Setrep::ValidateConstraints() const { + // Only "help" is allowed as single argument. + if (argc_ == 2) { + return opt_val_.count("help"); + } + + // Rest of the cases must contain more than 2 arguments on the command line. + return argc_ > 2; +} + +std::string Setrep::GetDescription() const { + std::stringstream desc; + desc << "Usage: hdfs_setrep [OPTION] NUM_REPLICAS PATH" << std::endl + << std::endl + << "Changes the replication factor of a file at PATH. If PATH is a " + "directory then the command" + << std::endl + << "recursively changes the replication factor of all files under the " + "directory tree rooted at PATH." + << std::endl + << std::endl + << " -h display this help and exit" << std::endl + << std::endl + << "Examples:" << std::endl + << "hdfs_setrep 5 hdfs://localhost.localdomain:8020/dir/file" + << std::endl + << "hdfs_setrep 3 /dir1/dir2" << std::endl; + return desc.str(); +} + +bool Setrep::Do() { + if (!Initialize()) { + std::cerr << "Unable to initialize HDFS setrep tool" << std::endl; + return false; + } + + if (!ValidateConstraints()) { + std::cout << GetDescription(); + return false; + } + + if (opt_val_.count("help") > 0) { + return HandleHelp(); + } + + if (opt_val_.count("path") > 0 && opt_val_.count("replication-factor") > 0) { + const auto replication_factor = + opt_val_["replication-factor"].as(); + const auto path = opt_val_["path"].as(); + return HandlePath(path, replication_factor); + } + + return false; +} + +bool Setrep::HandleHelp() const { + std::cout << GetDescription(); + return true; +} + +bool Setrep::HandlePath(const std::string &path, + const std::string &replication_factor) const { + // Building a URI object from the given path. + auto uri = hdfs::parse_path_or_exit(path); + + const auto fs = hdfs::doConnect(uri, true); + if (!fs) { + std::cerr << "Could not connect to the file system." << std::endl; + return false; + } + + /* + * Wrap async FileSystem::SetReplication with promise to make it a blocking + * call. + */ + auto promise = std::make_shared>(); + std::future future(promise->get_future()); + auto handler = [promise](const hdfs::Status &s) { promise->set_value(s); }; + + const auto replication = static_cast( + std::strtol(replication_factor.c_str(), nullptr, 8)); + /* + * Allocating shared state, which includes: + * replication to be set, handler to be called, request counter, and a boolean + * to keep track if find is done + */ + auto state = + std::make_shared(replication, handler, 0, false); + + /* + * Keep requesting more from Find until we process the entire listing. Call + * handler when Find is done and request counter is 0. Find guarantees that + * the handler will only be called once at a time so we do not need locking in + * handler_find. + */ + auto handler_find = [fs, state](const hdfs::Status &status_find, + const std::vector &stat_infos, + const bool has_more_results) -> bool { + /* + * For each result returned by Find we call async SetReplication with the + * handler below. SetReplication DOES NOT guarantee that the handler will + * only be called once at a time, so we DO need locking in + * handler_set_replication. + */ + auto handler_set_replication = + [state](const hdfs::Status &status_set_replication) { + std::lock_guard guard(state->lock); + + // Decrement the counter once since we are done with this async call. + if (!status_set_replication.ok() && state->status.ok()) { + // We make sure we set state->status only on the first error. + state->status = status_set_replication; + } + state->request_counter--; + if (state->request_counter == 0 && state->find_is_done) { + state->handler(state->status); // Exit. + } + }; + if (!stat_infos.empty() && state->status.ok()) { + for (hdfs::StatInfo const &stat_info : stat_infos) { + // Launch an asynchronous call to SetReplication for every returned + // file. + if (stat_info.file_type == hdfs::StatInfo::IS_FILE) { + state->request_counter++; + fs->SetReplication(stat_info.full_path, state->replication, + handler_set_replication); + } + } + } + + /* + * Lock this section because handlerSetReplication might be accessing the + * same shared variables simultaneously. + */ + std::lock_guard guard(state->lock); + if (!status_find.ok() && state->status.ok()) { + // We make sure we set state->status only on the first error. + state->status = status_find; + } + if (!has_more_results) { + state->find_is_done = true; + if (state->request_counter == 0) { + state->handler(state->status); // Exit. + } + return false; + } + return true; + }; + + // Asynchronous call to Find. + fs->Find(uri.get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), + handler_find); + + // Block until promise is set. + const auto status = future.get(); + if (!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + return false; + } + return true; +} +} // namespace hdfs::tools diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.h new file mode 100644 index 0000000000..20ee7405b6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.h @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_TOOLS_HDFS_SETREP +#define LIBHDFSPP_TOOLS_HDFS_SETREP + +#include + +#include + +#include "hdfs-tool.h" + +namespace hdfs::tools { +/** + * {@class Setrep} is an {@class HdfsTool} that changes the replication factor + * of a file at a given path. If the path is a directory, then it recursively + * changes the replication factor of all files under the directory tree rooted + * at the given path. + */ +class Setrep : public HdfsTool { +public: + /** + * {@inheritdoc} + */ + Setrep(int argc, char **argv); + + // Abiding to the Rule of 5 + Setrep(const Setrep &) = default; + Setrep(Setrep &&) = default; + Setrep &operator=(const Setrep &) = delete; + Setrep &operator=(Setrep &&) = delete; + ~Setrep() override = default; + + /** + * {@inheritdoc} + */ + [[nodiscard]] std::string GetDescription() const override; + + /** + * {@inheritdoc} + */ + [[nodiscard]] bool Do() override; + +protected: + /** + * {@inheritdoc} + */ + [[nodiscard]] bool Initialize() override; + + /** + * {@inheritdoc} + */ + [[nodiscard]] bool ValidateConstraints() const override; + + /** + * {@inheritdoc} + */ + [[nodiscard]] bool HandleHelp() const override; + + /** + * Handle the path argument that's passed to this tool. + * + * @param path The path to the directory for which we need setrep info. + * @param replication_factor The replication factor to set to given path and + * its children. + * + * @return A boolean indicating the result of this operation. + */ + [[nodiscard]] virtual bool + HandlePath(const std::string &path, + const std::string &replication_factor) const; + +private: + /** + * A boost data-structure containing the description of positional arguments + * passed to the command-line. + */ + po::positional_options_description pos_opt_desc_; +}; +} // namespace hdfs::tools +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/main.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/main.cc new file mode 100644 index 0000000000..a3d8399c57 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/main.cc @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include "hdfs-setrep.h" + +int main(int argc, char *argv[]) { + const auto result = std::atexit([]() -> void { + // Clean up static data on exit and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + }); + if (result != 0) { + std::cerr << "Error: Unable to schedule clean-up tasks for HDFS setrep " + "tool, exiting" + << std::endl; + std::exit(EXIT_FAILURE); + } + + hdfs::tools::Setrep setrep(argc, argv); + auto success = false; + + try { + success = setrep.Do(); + } catch (const std::exception &e) { + std::cerr << "Error: " << e.what() << std::endl; + } + + if (!success) { + std::exit(EXIT_FAILURE); + } + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_setrep.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_setrep.cc deleted file mode 100644 index 019e24d63f..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_setrep.cc +++ /dev/null @@ -1,172 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. -*/ - -#include -#include -#include -#include "tools_common.h" - -void usage(){ - std::cout << "Usage: hdfs_setrep [OPTION] NUM_REPLICAS PATH" - << std::endl - << std::endl << "Changes the replication factor of a file at PATH. If PATH is a directory then the command" - << std::endl << "recursively changes the replication factor of all files under the directory tree rooted at PATH." - << std::endl - << std::endl << " -h display this help and exit" - << std::endl - << std::endl << "Examples:" - << std::endl << "hdfs_setrep 5 hdfs://localhost.localdomain:8020/dir/file" - << std::endl << "hdfs_setrep 3 /dir1/dir2" - << std::endl; -} - -struct SetReplicationState { - const uint16_t replication; - const std::function handler; - //The request counter is incremented once every time SetReplication async call is made - uint64_t request_counter; - //This boolean will be set when find returns the last result - bool find_is_done; - //Final status to be returned - hdfs::Status status; - //Shared variables will need protection with a lock - std::mutex lock; - SetReplicationState(const uint16_t replication_, const std::function & handler_, - uint64_t request_counter_, bool find_is_done_) - : replication(replication_), - handler(handler_), - request_counter(request_counter_), - find_is_done(find_is_done_), - status(), - lock() { - } -}; - -int main(int argc, char *argv[]) { - //We should have 3 or 4 parameters - if (argc < 3) { - usage(); - exit(EXIT_FAILURE); - } - - int input; - - //Using GetOpt to read in the values - opterr = 0; - while ((input = getopt(argc, argv, "h")) != -1) { - switch (input) - { - case 'h': - usage(); - exit(EXIT_SUCCESS); - case '?': - if (isprint(optopt)) - std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl; - else - std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl; - usage(); - exit(EXIT_FAILURE); - default: - exit(EXIT_FAILURE); - } - } - std::string repl = argv[optind]; - std::string uri_path = argv[optind + 1]; - - //Building a URI object from the given uri_path - hdfs::URI uri = hdfs::parse_path_or_exit(uri_path); - - std::shared_ptr fs = hdfs::doConnect(uri, true); - if (!fs) { - std::cerr << "Could not connect the file system. " << std::endl; - exit(EXIT_FAILURE); - } - - /* wrap async FileSystem::SetReplication with promise to make it a blocking call */ - std::shared_ptr> promise = std::make_shared>(); - std::future future(promise->get_future()); - auto handler = [promise](const hdfs::Status &s) { - promise->set_value(s); - }; - - uint16_t replication = std::stoi(repl.c_str(), NULL, 8); - //Allocating shared state, which includes: - //replication to be set, handler to be called, request counter, and a boolean to keep track if find is done - std::shared_ptr state = std::make_shared(replication, handler, 0, false); - - // Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0. - // Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind. - auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector & stat_infos, bool has_more_results) -> bool { - - //For each result returned by Find we call async SetReplication with the handler below. - //SetReplication DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetReplication. - auto handlerSetReplication = [state](const hdfs::Status &status_set_replication) { - std::lock_guard guard(state->lock); - - //Decrement the counter once since we are done with this async call - if (!status_set_replication.ok() && state->status.ok()){ - //We make sure we set state->status only on the first error. - state->status = status_set_replication; - } - state->request_counter--; - if(state->request_counter == 0 && state->find_is_done){ - state->handler(state->status); //exit - } - }; - if(!stat_infos.empty() && state->status.ok()) { - for (hdfs::StatInfo const& s : stat_infos) { - //Launch an asynchronous call to SetReplication for every returned file - if(s.file_type == hdfs::StatInfo::IS_FILE){ - state->request_counter++; - fs->SetReplication(s.full_path, state->replication, handlerSetReplication); - } - } - } - - //Lock this section because handlerSetReplication might be accessing the same - //shared variables simultaneously - std::lock_guard guard(state->lock); - if (!status_find.ok() && state->status.ok()){ - //We make sure we set state->status only on the first error. - state->status = status_find; - } - if(!has_more_results){ - state->find_is_done = true; - if(state->request_counter == 0){ - state->handler(state->status); //exit - } - return false; - } - return true; - }; - - //Asynchronous call to Find - fs->Find(uri.get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind); - - /* block until promise is set */ - hdfs::Status status = future.get(); - if (!status.ok()) { - std::cerr << "Error: " << status.ToString() << std::endl; - exit(EXIT_FAILURE); - } - - // Clean up static data and prevent valgrind memory leaks - google::protobuf::ShutdownProtobufLibrary(); - return 0; -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/internal/set-replication-state.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/internal/set-replication-state.h new file mode 100644 index 0000000000..5d432eddbf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/internal/set-replication-state.h @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_TOOLS_HDFS_SET_REPLICATION_STATE +#define LIBHDFSPP_TOOLS_HDFS_SET_REPLICATION_STATE + +#include +#include + +#include "hdfspp/hdfspp.h" + +namespace hdfs::tools { +/** + * {@class SetReplicationState} helps in handling the intermediate results while + * running {@link Setrep}. + */ +struct SetReplicationState { + SetReplicationState(const uint16_t replication, + std::function handler, + const uint64_t request_counter, const bool find_is_done) + : replication{replication}, handler{std::move(handler)}, + request_counter{request_counter}, find_is_done{find_is_done} {} + + /** + * The replication factor. + */ + const uint16_t replication; + + /** + * Handle the given {@link hdfs::Status}. + */ + const std::function handler; + + /** + * The request counter is incremented once every time SetReplication async + * call is made. + */ + uint64_t request_counter; + + /** + * This boolean will be set when find returns the last result. + */ + bool find_is_done; + + /** + * Final status to be returned. + */ + hdfs::Status status; + + /** + * Shared variables will need protection with a lock. + */ + std::mutex lock; +}; +} // namespace hdfs::tools + +#endif