HDFS-10595: libhdfs++: deconflate client name and client id
This commit is contained in:
parent
d22e4b2eb7
commit
69cb05d292
@ -21,6 +21,10 @@
|
|||||||
|
|
||||||
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
|
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
|
#include <sstream>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
||||||
@ -56,25 +60,17 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL
|
|||||||
|
|
||||||
|
|
||||||
std::string GetRandomClientName() {
|
std::string GetRandomClientName() {
|
||||||
/**
|
std::vector<unsigned char>buf(8);
|
||||||
* The server is requesting a 16-byte UUID:
|
RAND_pseudo_bytes(&buf[0], 8);
|
||||||
* https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
|
|
||||||
*
|
|
||||||
* This function generates a 16-byte UUID (version 4):
|
|
||||||
* https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29
|
|
||||||
**/
|
|
||||||
unsigned char buf[16];
|
|
||||||
RAND_pseudo_bytes(buf, sizeof(buf));
|
|
||||||
|
|
||||||
//clear the first four bits of byte 6 then set the second bit
|
std::ostringstream oss;
|
||||||
buf[6] = (buf[6] & 0x0f) | 0x40;
|
oss << "DFSClient_" << getpid() << "_" <<
|
||||||
|
std::this_thread::get_id() << "_" <<
|
||||||
|
std::setw(2) << std::hex << std::uppercase << std::setfill('0');
|
||||||
|
for (unsigned char b: buf)
|
||||||
|
oss << static_cast<unsigned>(b);
|
||||||
|
|
||||||
//clear the second bit of byte 8 and set the first bit
|
return oss.str();
|
||||||
buf[8] = (buf[8] & 0xbf) | 0x80;
|
|
||||||
|
|
||||||
std::stringstream ss;
|
|
||||||
ss << std::string(reinterpret_cast<char *>(buf), sizeof(buf));
|
|
||||||
return ss.str();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
|
std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
|
||||||
|
@ -102,7 +102,7 @@ static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
|
|||||||
rpc_header->set_callid(call_id);
|
rpc_header->set_callid(call_id);
|
||||||
if (retry_count != kNoRetry)
|
if (retry_count != kNoRetry)
|
||||||
rpc_header->set_retrycount(retry_count);
|
rpc_header->set_retrycount(retry_count);
|
||||||
rpc_header->set_clientid(engine->client_name());
|
rpc_header->set_clientid(engine->client_id());
|
||||||
|
|
||||||
req_header->set_methodname(method_name);
|
req_header->set_methodname(method_name);
|
||||||
req_header->set_declaringclassprotocolname(engine->protocol_name());
|
req_header->set_declaringclassprotocolname(engine->protocol_name());
|
||||||
|
@ -145,6 +145,7 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
|
|||||||
: io_service_(io_service),
|
: io_service_(io_service),
|
||||||
options_(options),
|
options_(options),
|
||||||
client_name_(client_name),
|
client_name_(client_name),
|
||||||
|
client_id_(getRandomClientId()),
|
||||||
protocol_name_(protocol_name),
|
protocol_name_(protocol_name),
|
||||||
protocol_version_(protocol_version),
|
protocol_version_(protocol_version),
|
||||||
call_id_(0),
|
call_id_(0),
|
||||||
@ -206,6 +207,28 @@ std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options &opt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string RpcEngine::getRandomClientId()
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* The server is requesting a 16-byte UUID:
|
||||||
|
* https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
|
||||||
|
*
|
||||||
|
* This function generates a 16-byte UUID (version 4):
|
||||||
|
* https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29
|
||||||
|
**/
|
||||||
|
std::vector<unsigned char>buf(16);
|
||||||
|
RAND_pseudo_bytes(&buf[0], buf.size());
|
||||||
|
|
||||||
|
//clear the first four bits of byte 6 then set the second bit
|
||||||
|
buf[6] = (buf[6] & 0x0f) | 0x40;
|
||||||
|
|
||||||
|
//clear the second bit of byte 8 and set the first bit
|
||||||
|
buf[8] = (buf[8] & 0xbf) | 0x80;
|
||||||
|
return std::string(reinterpret_cast<const char*>(&buf[0]), buf.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
|
void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
|
||||||
conn_ = conn;
|
conn_ = conn;
|
||||||
retry_policy_ = std::move(MakeRetryPolicy(options_));
|
retry_policy_ = std::move(MakeRetryPolicy(options_));
|
||||||
|
@ -269,6 +269,7 @@ public:
|
|||||||
virtual int NextCallId() = 0;
|
virtual int NextCallId() = 0;
|
||||||
|
|
||||||
virtual const std::string &client_name() const = 0;
|
virtual const std::string &client_name() const = 0;
|
||||||
|
virtual const std::string &client_id() const = 0;
|
||||||
virtual const std::string &user_name() const = 0;
|
virtual const std::string &user_name() const = 0;
|
||||||
virtual const std::string &protocol_name() const = 0;
|
virtual const std::string &protocol_name() const = 0;
|
||||||
virtual int protocol_version() const = 0;
|
virtual int protocol_version() const = 0;
|
||||||
@ -380,6 +381,7 @@ class RpcEngine : public LockFreeRpcEngine {
|
|||||||
std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions();
|
std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions();
|
||||||
|
|
||||||
const std::string &client_name() const override { return client_name_; }
|
const std::string &client_name() const override { return client_name_; }
|
||||||
|
const std::string &client_id() const override { return client_id_; }
|
||||||
const std::string &user_name() const override { return auth_info_.getUser(); }
|
const std::string &user_name() const override { return auth_info_.getUser(); }
|
||||||
const std::string &protocol_name() const override { return protocol_name_; }
|
const std::string &protocol_name() const override { return protocol_name_; }
|
||||||
int protocol_version() const override { return protocol_version_; }
|
int protocol_version() const override { return protocol_version_; }
|
||||||
@ -394,6 +396,8 @@ protected:
|
|||||||
virtual std::shared_ptr<RpcConnection> NewConnection();
|
virtual std::shared_ptr<RpcConnection> NewConnection();
|
||||||
virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options);
|
virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options);
|
||||||
|
|
||||||
|
static std::string getRandomClientId();
|
||||||
|
|
||||||
// Remember all of the last endpoints in case we need to reconnect and retry
|
// Remember all of the last endpoints in case we need to reconnect and retry
|
||||||
std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
|
std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
|
||||||
|
|
||||||
@ -401,6 +405,7 @@ private:
|
|||||||
::asio::io_service * const io_service_;
|
::asio::io_service * const io_service_;
|
||||||
const Options options_;
|
const Options options_;
|
||||||
const std::string client_name_;
|
const std::string client_name_;
|
||||||
|
const std::string client_id_;
|
||||||
const std::string protocol_name_;
|
const std::string protocol_name_;
|
||||||
const int protocol_version_;
|
const int protocol_version_;
|
||||||
std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
|
std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
|
||||||
|
Loading…
Reference in New Issue
Block a user