HDFS-9738: libhdfs++: Implement simple authentication. Contributed by Bob Hansen
This commit is contained in:
parent
71b4f2ea25
commit
d807773752
@ -120,9 +120,11 @@ class FileSystem {
|
||||
* Create a new instance of the FileSystem object. The call
|
||||
* initializes the RPC connections to the NameNode and returns an
|
||||
* FileSystem object.
|
||||
*
|
||||
* If user_name is blank, the current user will be used for a default.
|
||||
**/
|
||||
static FileSystem * New(
|
||||
IoService *&io_service, const Options &options);
|
||||
IoService *&io_service, const std::string &user_name, const Options &options);
|
||||
|
||||
virtual void Connect(const std::string &server,
|
||||
const std::string &service,
|
||||
|
@ -54,6 +54,7 @@ class Status {
|
||||
kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
|
||||
kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
|
||||
kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
|
||||
kPermissionDenied = static_cast<unsigned>(std::errc::permission_denied),
|
||||
kException = 255,
|
||||
};
|
||||
|
||||
|
@ -86,6 +86,7 @@ struct hdfsBuilder {
|
||||
|
||||
std::string overrideHost;
|
||||
tPort overridePort; // 0 --> use default
|
||||
std::string user;
|
||||
|
||||
static constexpr tPort kUseDefaultPort = 0;
|
||||
static constexpr tPort kDefaultPort = 8020;
|
||||
@ -124,6 +125,12 @@ static int Error(const Status &stat) {
|
||||
case Status::Code::kOperationCanceled:
|
||||
ReportError(EINTR, "Operation canceled");
|
||||
break;
|
||||
case Status::Code::kPermissionDenied:
|
||||
if (!stat.ToString().empty())
|
||||
ReportError(EACCES, stat.ToString().c_str());
|
||||
else
|
||||
ReportError(EACCES, "Permission denied");
|
||||
break;
|
||||
default:
|
||||
ReportError(ENOSYS, "Error: unrecognised code");
|
||||
}
|
||||
@ -156,9 +163,18 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
|
||||
}
|
||||
|
||||
hdfsFS hdfsConnect(const char *nn, tPort port) {
|
||||
return hdfsConnectAsUser(nn, port, "");
|
||||
}
|
||||
|
||||
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
|
||||
std::string port_as_string = std::to_string(port);
|
||||
IoService * io_service = IoService::New();
|
||||
FileSystem *fs = FileSystem::New(io_service, Options());
|
||||
std::string user_name;
|
||||
if (user) {
|
||||
user_name = user;
|
||||
}
|
||||
|
||||
FileSystem *fs = FileSystem::New(io_service, user_name, Options());
|
||||
if (!fs) {
|
||||
return nullptr;
|
||||
}
|
||||
@ -323,6 +339,16 @@ void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
||||
bld->overridePort = port;
|
||||
}
|
||||
|
||||
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
||||
{
|
||||
if (userName) {
|
||||
bld->user = userName;
|
||||
} else {
|
||||
bld->user = "";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
||||
{
|
||||
delete bld;
|
||||
@ -358,7 +384,10 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
||||
{
|
||||
port = hdfsBuilder::kDefaultPort;
|
||||
}
|
||||
return hdfsConnect(bld->overrideHost.c_str(), port);
|
||||
if (bld->user.empty())
|
||||
return hdfsConnect(bld->overrideHost.c_str(), port);
|
||||
else
|
||||
return hdfsConnectAsUser(bld->overrideHost.c_str(), port, bld->user.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -20,9 +20,12 @@
|
||||
|
||||
#include <cassert>
|
||||
#include <sstream>
|
||||
#include <cstring>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException";
|
||||
|
||||
Status::Status(int code, const char *msg1) : code_(code) {
|
||||
if(msg1) {
|
||||
msg_ = msg1;
|
||||
@ -58,7 +61,10 @@ Status Status::Unimplemented() {
|
||||
}
|
||||
|
||||
Status Status::Exception(const char *exception_class_name, const char *error_message) {
|
||||
return Status(kException, exception_class_name, error_message);
|
||||
if (exception_class_name && (strcmp(exception_class_name, kStatusAccessControlException) == 0) )
|
||||
return Status(kPermissionDenied, error_message);
|
||||
else
|
||||
return Status(kException, exception_class_name, error_message);
|
||||
}
|
||||
|
||||
Status Status::Error(const char *error_message) {
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <future>
|
||||
#include <tuple>
|
||||
#include <iostream>
|
||||
#include <pwd.h>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
@ -108,18 +109,48 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
|
||||
****************************************************************************/
|
||||
|
||||
FileSystem * FileSystem::New(
|
||||
IoService *&io_service, const Options &options) {
|
||||
return new FileSystemImpl(io_service, options);
|
||||
IoService *&io_service, const std::string &user_name, const Options &options) {
|
||||
return new FileSystemImpl(io_service, user_name, options);
|
||||
}
|
||||
|
||||
/*****************************************************************************
|
||||
* FILESYSTEM IMPLEMENTATION
|
||||
****************************************************************************/
|
||||
|
||||
FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options)
|
||||
const std::string get_effective_user_name(const std::string &user_name) {
|
||||
if (!user_name.empty())
|
||||
return user_name;
|
||||
|
||||
// If no user name was provided, try the HADOOP_USER_NAME and USER environment
|
||||
// variables
|
||||
const char * env = getenv("HADOOP_USER_NAME");
|
||||
if (env) {
|
||||
return env;
|
||||
}
|
||||
|
||||
env = getenv("USER");
|
||||
if (env) {
|
||||
return env;
|
||||
}
|
||||
|
||||
// If running on POSIX, use the currently logged in user
|
||||
#if defined(_POSIX_VERSION)
|
||||
uid_t uid = geteuid();
|
||||
struct passwd *pw = getpwuid(uid);
|
||||
if (pw && pw->pw_name)
|
||||
{
|
||||
return pw->pw_name;
|
||||
}
|
||||
#endif
|
||||
|
||||
return "unknown_user";
|
||||
}
|
||||
|
||||
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name,
|
||||
const Options &options)
|
||||
: io_service_(static_cast<IoServiceImpl *>(io_service)),
|
||||
nn_(&io_service_->io_service(), options,
|
||||
GetRandomClientName(), kNamenodeProtocol,
|
||||
GetRandomClientName(), get_effective_user_name(user_name), kNamenodeProtocol,
|
||||
kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
|
||||
bad_node_tracker_(std::make_shared<BadDataNodeTracker>())
|
||||
{
|
||||
|
@ -47,10 +47,10 @@ namespace hdfs {
|
||||
class NameNodeOperations {
|
||||
public:
|
||||
NameNodeOperations(::asio::io_service *io_service, const Options &options,
|
||||
const std::string &client_name, const char *protocol_name,
|
||||
int protocol_version) :
|
||||
const std::string &client_name, const std::string &user_name,
|
||||
const char *protocol_name, int protocol_version) :
|
||||
io_service_(io_service),
|
||||
engine_(io_service, options, client_name, protocol_name, protocol_version),
|
||||
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
|
||||
namenode_(& engine_) {}
|
||||
|
||||
void Connect(const std::string &server,
|
||||
@ -80,7 +80,7 @@ private:
|
||||
*/
|
||||
class FileSystemImpl : public FileSystem {
|
||||
public:
|
||||
FileSystemImpl(IoService *&io_service, const Options &options);
|
||||
FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
|
||||
~FileSystemImpl() override;
|
||||
|
||||
/* attempt to connect to namenode, return bad status on failure */
|
||||
|
@ -227,6 +227,21 @@ void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
|
||||
std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
/** From Client.java:
|
||||
*
|
||||
* Write the connection header - this is sent when connection is established
|
||||
* +----------------------------------+
|
||||
* | "hrpc" 4 bytes |
|
||||
* +----------------------------------+
|
||||
* | Version (1 byte) |
|
||||
* +----------------------------------+
|
||||
* | Service Class (1 byte) |
|
||||
* +----------------------------------+
|
||||
* | AuthProtocol (1 byte) |
|
||||
* +----------------------------------+
|
||||
*
|
||||
* AuthProtocol: 0->none, -33->SASL
|
||||
*/
|
||||
static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
|
||||
RpcEngine::kRpcVersion, 0, 0};
|
||||
auto res =
|
||||
@ -240,6 +255,10 @@ std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
|
||||
|
||||
IpcConnectionContextProto handshake;
|
||||
handshake.set_protocol(engine_->protocol_name());
|
||||
const std::string & user_name = engine()->user_name();
|
||||
if (!user_name.empty()) {
|
||||
*handshake.mutable_userinfo()->mutable_effectiveuser() = user_name;
|
||||
}
|
||||
AddHeadersToPacket(res.get(), {&h, &handshake}, nullptr);
|
||||
return res;
|
||||
}
|
||||
|
@ -28,11 +28,12 @@ template <class T>
|
||||
using optional = std::experimental::optional<T>;
|
||||
|
||||
RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
|
||||
const std::string &client_name, const char *protocol_name,
|
||||
int protocol_version)
|
||||
const std::string &client_name, const std::string &user_name,
|
||||
const char *protocol_name, int protocol_version)
|
||||
: io_service_(io_service),
|
||||
options_(options),
|
||||
client_name_(client_name),
|
||||
user_name_(user_name),
|
||||
protocol_name_(protocol_name),
|
||||
protocol_version_(protocol_version),
|
||||
retry_policy_(std::move(MakeRetryPolicy(options))),
|
||||
|
@ -206,6 +206,7 @@ public:
|
||||
virtual int NextCallId() = 0;
|
||||
|
||||
virtual const std::string &client_name() const = 0;
|
||||
virtual const std::string &user_name() const = 0;
|
||||
virtual const std::string &protocol_name() const = 0;
|
||||
virtual int protocol_version() const = 0;
|
||||
virtual ::asio::io_service &io_service() = 0;
|
||||
@ -230,8 +231,8 @@ class RpcEngine : public LockFreeRpcEngine {
|
||||
};
|
||||
|
||||
RpcEngine(::asio::io_service *io_service, const Options &options,
|
||||
const std::string &client_name, const char *protocol_name,
|
||||
int protocol_version);
|
||||
const std::string &client_name, const std::string &user_name,
|
||||
const char *protocol_name, int protocol_version);
|
||||
|
||||
void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler);
|
||||
|
||||
@ -265,6 +266,7 @@ class RpcEngine : public LockFreeRpcEngine {
|
||||
void TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn);
|
||||
|
||||
const std::string &client_name() const override { return client_name_; }
|
||||
const std::string &user_name() const override { return user_name_; }
|
||||
const std::string &protocol_name() const override { return protocol_name_; }
|
||||
int protocol_version() const override { return protocol_version_; }
|
||||
::asio::io_service &io_service() override { return *io_service_; }
|
||||
@ -281,6 +283,7 @@ private:
|
||||
::asio::io_service * const io_service_;
|
||||
const Options options_;
|
||||
const std::string client_name_;
|
||||
const std::string user_name_;
|
||||
const std::string protocol_name_;
|
||||
const int protocol_version_;
|
||||
const std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
|
||||
|
@ -105,7 +105,7 @@ using namespace hdfs;
|
||||
TEST(RpcEngineTest, TestRoundTrip) {
|
||||
::asio::io_service io_service;
|
||||
Options options;
|
||||
RpcEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
RpcConnectionImpl<MockRPCConnection> *conn =
|
||||
new RpcConnectionImpl<MockRPCConnection>(&engine);
|
||||
conn->TEST_set_connected(true);
|
||||
@ -141,7 +141,7 @@ TEST(RpcEngineTest, TestRoundTrip) {
|
||||
TEST(RpcEngineTest, TestConnectionResetAndFail) {
|
||||
::asio::io_service io_service;
|
||||
Options options;
|
||||
RpcEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
RpcConnectionImpl<MockRPCConnection> *conn =
|
||||
new RpcConnectionImpl<MockRPCConnection>(&engine);
|
||||
conn->TEST_set_connected(true);
|
||||
@ -178,7 +178,7 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) {
|
||||
Options options;
|
||||
options.max_rpc_retries = 1;
|
||||
options.rpc_retry_delay_ms = 0;
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
|
||||
EchoResponseProto server_resp;
|
||||
server_resp.set_message("foo");
|
||||
@ -213,7 +213,7 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
|
||||
Options options;
|
||||
options.max_rpc_retries = 1;
|
||||
options.rpc_retry_delay_ms = 1;
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
|
||||
EchoResponseProto server_resp;
|
||||
server_resp.set_message("foo");
|
||||
@ -262,7 +262,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
|
||||
Options options;
|
||||
options.max_rpc_retries = 0;
|
||||
options.rpc_retry_delay_ms = 0;
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
EXPECT_CALL(*producer, Produce())
|
||||
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
|
||||
|
||||
@ -288,7 +288,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
|
||||
Options options;
|
||||
options.max_rpc_retries = 2;
|
||||
options.rpc_retry_delay_ms = 0;
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
EXPECT_CALL(*producer, Produce())
|
||||
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
|
||||
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
|
||||
@ -316,7 +316,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
|
||||
Options options;
|
||||
options.max_rpc_retries = 1;
|
||||
options.rpc_retry_delay_ms = 0;
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
EXPECT_CALL(*producer, Produce())
|
||||
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
|
||||
.WillOnce(Return(std::make_pair(::asio::error_code(), "")))
|
||||
@ -345,7 +345,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
|
||||
Options options;
|
||||
options.max_rpc_retries = 1;
|
||||
options.rpc_retry_delay_ms = 1;
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
EXPECT_CALL(*producer, Produce())
|
||||
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
|
||||
.WillOnce(Return(std::make_pair(::asio::error_code(), "")))
|
||||
@ -369,7 +369,7 @@ TEST(RpcEngineTest, TestTimeout) {
|
||||
::asio::io_service io_service;
|
||||
Options options;
|
||||
options.rpc_timeout = 1;
|
||||
RpcEngine engine(&io_service, options, "foo", "protocol", 1);
|
||||
RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
|
||||
RpcConnectionImpl<MockRPCConnection> *conn =
|
||||
new RpcConnectionImpl<MockRPCConnection>(&engine);
|
||||
conn->TEST_set_connected(true);
|
||||
|
Loading…
Reference in New Issue
Block a user