From 69cb05d2920a02435921028b70ed88c587823c05 Mon Sep 17 00:00:00 2001 From: Bob Hansen Date: Fri, 30 Sep 2016 11:42:51 -0400 Subject: [PATCH] HDFS-10595: libhdfs++: deconflate client name and client id --- .../main/native/libhdfspp/lib/common/util.cc | 30 ++++++++----------- .../libhdfspp/lib/rpc/rpc_connection.cc | 2 +- .../native/libhdfspp/lib/rpc/rpc_engine.cc | 23 ++++++++++++++ .../native/libhdfspp/lib/rpc/rpc_engine.h | 5 ++++ 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index 853ada867f..ede6acd96d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -21,6 +21,10 @@ #include #include +#include +#include +#include +#include namespace hdfs { @@ -56,25 +60,17 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL std::string GetRandomClientName() { - /** - * The server is requesting a 16-byte UUID: - * https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java - * - * This function generates a 16-byte UUID (version 4): - * https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29 - **/ - unsigned char buf[16]; - RAND_pseudo_bytes(buf, sizeof(buf)); + std::vectorbuf(8); + RAND_pseudo_bytes(&buf[0], 8); - //clear the first four bits of byte 6 then set the second bit - buf[6] = (buf[6] & 0x0f) | 0x40; + std::ostringstream oss; + oss << "DFSClient_" << getpid() << "_" << + std::this_thread::get_id() << "_" << + std::setw(2) << std::hex << std::uppercase << std::setfill('0'); + for (unsigned char b: buf) + oss << static_cast(b); - //clear the second bit of byte 8 and set the first bit - buf[8] = (buf[8] & 0xbf) | 0x80; - - std::stringstream ss; - ss << std::string(reinterpret_cast(buf), sizeof(buf)); - return ss.str(); + return oss.str(); } std::string SafeDisconnect(asio::ip::tcp::socket *sock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index b6136c25f4..2068614fc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -102,7 +102,7 @@ static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id, rpc_header->set_callid(call_id); if (retry_count != kNoRetry) rpc_header->set_retrycount(retry_count); - rpc_header->set_clientid(engine->client_name()); + rpc_header->set_clientid(engine->client_id()); req_header->set_methodname(method_name); req_header->set_declaringclassprotocolname(engine->protocol_name()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 89d6a67104..4de67046b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -145,6 +145,7 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, : io_service_(io_service), options_(options), client_name_(client_name), + client_id_(getRandomClientId()), protocol_name_(protocol_name), protocol_version_(protocol_version), call_id_(0), @@ -206,6 +207,28 @@ std::unique_ptr RpcEngine::MakeRetryPolicy(const Options &opt } } +std::string RpcEngine::getRandomClientId() +{ + /** + * The server is requesting a 16-byte UUID: + * https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java + * + * This function generates a 16-byte UUID (version 4): + * https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29 + **/ + std::vectorbuf(16); + RAND_pseudo_bytes(&buf[0], buf.size()); + + //clear the first four bits of byte 6 then set the second bit + buf[6] = (buf[6] & 0x0f) | 0x40; + + //clear the second bit of byte 8 and set the first bit + buf[8] = (buf[8] & 0xbf) | 0x80; + return std::string(reinterpret_cast(&buf[0]), buf.size()); +} + + + void RpcEngine::TEST_SetRpcConnection(std::shared_ptr conn) { conn_ = conn; retry_policy_ = std::move(MakeRetryPolicy(options_)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index efaf407db7..9191ab276b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -269,6 +269,7 @@ public: virtual int NextCallId() = 0; virtual const std::string &client_name() const = 0; + virtual const std::string &client_id() const = 0; virtual const std::string &user_name() const = 0; virtual const std::string &protocol_name() const = 0; virtual int protocol_version() const = 0; @@ -380,6 +381,7 @@ class RpcEngine : public LockFreeRpcEngine { std::unique_ptr TEST_GenerateRetryPolicyUsingOptions(); const std::string &client_name() const override { return client_name_; } + const std::string &client_id() const override { return client_id_; } const std::string &user_name() const override { return auth_info_.getUser(); } const std::string &protocol_name() const override { return protocol_name_; } int protocol_version() const override { return protocol_version_; } @@ -394,6 +396,8 @@ protected: virtual std::shared_ptr NewConnection(); virtual std::unique_ptr MakeRetryPolicy(const Options &options); + static std::string getRandomClientId(); + // Remember all of the last endpoints in case we need to reconnect and retry std::vector<::asio::ip::tcp::endpoint> last_endpoints_; @@ -401,6 +405,7 @@ private: ::asio::io_service * const io_service_; const Options options_; const std::string client_name_; + const std::string client_id_; const std::string protocol_name_; const int protocol_version_; std::unique_ptr retry_policy_; //null --> no retry