HDFS-10524: libhdfs++: Implement chmod and chown. Contributed by Anatoli Shein.

This commit is contained in:
Bob Hansen 2016-06-21 16:42:47 -04:00 committed by James Clampffer
parent 88c5768f99
commit d4c3cfbf47
9 changed files with 414 additions and 58 deletions

View File

@ -310,21 +310,14 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
//Test case: File does not exist
EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, invalid_path), ENOENT);
// Test case: No permission to access parent directory
// Trying to set permissions of the parent directory to 0
// by a super user, and then connecting as SomeGuy. Should
// receive permission denied, but receives fileInfo.
// EXPECT_ZERO(hdfsChmod(fs, paths->prefix, 0));
// EXPECT_ZERO(hdfsChmod(fs, paths->file2, 0));
// EXPECT_ZERO(hdfsDisconnect(fs));
// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "SomeGuy"));
// EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, paths->file2), EACCES);
// EXPECT_ZERO(hdfsDisconnect(fs));
// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
// if (!fs) {
// return 1;
// }
return 0;
//Test case: No permission to access parent directory
EXPECT_ZERO(hdfsChmod(fs, paths->prefix, 0));
//reconnect as user "SomeGuy" and verify that we get permission errors
hdfsFS fs2 = NULL;
EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs2, "SomeGuy"));
EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs2, paths->file2), EACCES);
EXPECT_ZERO(hdfsDisconnect(fs2));
return 0;
}
static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)

View File

@ -250,6 +250,30 @@ class FileSystem {
const std::function<void(const Status &)> &handler) = 0;
virtual Status Rename(const std::string &oldPath, const std::string &newPath) = 0;
/**
* Set permissions for an existing file/directory.
*
* @param path the path to the file or directory
* @param permissions the bitmask to set it to (should be between 0 and 01777)
*/
virtual void SetPermission(const std::string & path,
short permissions, const std::function<void(const Status &)> &handler) = 0;
virtual Status SetPermission(const std::string & path, short permissions) = 0;
/**
* Set Owner of a path (i.e. a file or a directory).
* The parameters username and groupname can be empty.
* @param path file path
* @param username If it is empty, the original username remains unchanged.
* @param groupname If it is empty, the original groupname remains unchanged.
*/
virtual void SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) = 0;
virtual Status SetOwner(const std::string & path,
const std::string & username, const std::string & groupname) = 0;
/*****************************************************************************
* FILE SYSTEM SNAPSHOT FUNCTIONS
****************************************************************************/

View File

@ -284,6 +284,16 @@ hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
}
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) {
//libhdfspp always returns a new instance
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
}
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
//libhdfspp always returns a new instance
return hdfsConnectAsUser(nn, port, "");
}
int hdfsDisconnect(hdfsFS fs) {
try
{
@ -587,6 +597,56 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
}
}
int hdfsChmod(hdfsFS fs, const char* path, short mode){
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
if (!path) {
return Error(Status::InvalidArgument("hdfsChmod: argument 'path' cannot be NULL"));
}
Status stat = NameNodeOperations::CheckValidPermissionMask(mode);
if (!stat.ok()) {
return Error(stat);
}
stat = fs->get_impl()->SetPermission(path, mode);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
if (!path) {
return Error(Status::InvalidArgument("hdfsChown: argument 'path' cannot be NULL"));
}
std::string own = (owner) ? owner : "";
std::string grp = (group) ? group : "";
Status stat;
stat = fs->get_impl()->SetOwner(path, own, grp);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
try {
errno = 0;
@ -1009,6 +1069,11 @@ void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
}
}
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
//libhdfspp always returns a new instance, so nothing to do
(void)bld;
errno = 0;
}
void hdfsFreeBuilder(struct hdfsBuilder *bld)
{

View File

@ -617,6 +617,79 @@ Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &new
return stat;
}
void FileSystemImpl::SetPermission(const std::string & path,
short permissions, const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
return;
}
Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
}
nn_.SetPermission(path, permissions, handler);
}
Status FileSystemImpl::SetPermission(const std::string & path, short permissions) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetPermission with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetPermission(path, permissions, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
void FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty"));
return;
}
nn_.SetOwner(path, username, groupname, handler);
}
Status FileSystemImpl::SetOwner(const std::string & path,
const std::string & username, const std::string & groupname) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetOwner with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetOwner(path, username, groupname, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
void FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name,
const std::function<void(const Status &)> &handler) {

View File

@ -105,6 +105,15 @@ public:
const std::function<void(const Status &)> &handler) override;
virtual Status Rename(const std::string &oldPath, const std::string &newPath) override;
virtual void SetPermission(const std::string & path,
short permissions, const std::function<void(const Status &)> &handler) override;
virtual Status SetPermission(const std::string & path, short permissions) override;
virtual void SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) override;
virtual Status SetOwner(const std::string & path,
const std::string & username, const std::string & groupname) override;
/*****************************************************************************
* FILE SYSTEM SNAPSHOT FUNCTIONS
****************************************************************************/

View File

@ -38,6 +38,18 @@ namespace hdfs {
* NAMENODE OPERATIONS
****************************************************************************/
Status NameNodeOperations::CheckValidPermissionMask(short permissions) {
if (permissions < 0 || permissions > 01777) {
std::stringstream errormsg;
errormsg << "IsValidPermissionMask: argument 'permissions' is " << std::oct
<< std::showbase << permissions << " (should be between 0 and 01777)";
//Avoid copying by binding errormsg.str() to a const reference, which extends its lifetime
const std::string& tmp = errormsg.str();
return Status::InvalidArgument(tmp.c_str());
}
return Status::OK();
}
void NameNodeOperations::Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
@ -333,6 +345,69 @@ void NameNodeOperations::Rename(const std::string & oldPath, const std::string &
});
}
void NameNodeOperations::SetPermission(const std::string & path,
short permissions, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::SetPermissionRequestProto;
using ::hadoop::hdfs::SetPermissionResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
return;
}
Status permStatus = CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
}
SetPermissionRequestProto req;
req.set_src(path);
hadoop::hdfs::FsPermissionProto *perm = req.mutable_permission();
perm->set_perm(permissions);
auto resp = std::make_shared<SetPermissionResponseProto>();
namenode_.SetPermission(&req, resp,
[handler](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::SetOwner(const std::string & path,
const std::string & username, const std::string & groupname, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::SetOwnerRequestProto;
using ::hadoop::hdfs::SetOwnerResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty"));
return;
}
SetOwnerRequestProto req;
req.set_src(path);
if(!username.empty()) {
req.set_username(username);
}
if(!groupname.empty()) {
req.set_groupname(groupname);
}
auto resp = std::make_shared<SetOwnerResponseProto>();
namenode_.SetOwner(&req, resp,
[handler](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::CreateSnapshot(const std::string & path,
const std::string & name, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::CreateSnapshotRequestProto;

View File

@ -47,6 +47,8 @@ public:
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
namenode_(& engine_) {}
static Status CheckValidPermissionMask(short permissions);
void Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
@ -74,6 +76,12 @@ public:
void Rename(const std::string & oldPath, const std::string & newPath,
std::function<void(const Status &)> handler);
void SetPermission(const std::string & path, short permissions,
std::function<void(const Status &)> handler);
void SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, std::function<void(const Status &)> handler);
void CreateSnapshot(const std::string & path, const std::string & name,
std::function<void(const Status &)> handler);

View File

@ -250,9 +250,54 @@ TEST_F(HdfsExtTest, TestRename) {
hdfsFreeFileInfo(dirList, 3);
}
//Testing Chmod and Chown
TEST_F(HdfsExtTest, TestChmodChown) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Path not found
std::string path = "/wrong/dir/";
EXPECT_EQ(-1, hdfsChmod(fs, path.c_str(), 0777));
EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno);
EXPECT_EQ(-1, hdfsChown(fs, path.c_str(), "foo", "bar"));
EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno);
//Wrong arguments
path = connection.newFile(1024); //1024 byte file
EXPECT_EQ(-1, hdfsChmod(fs, nullptr, 0777));
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsChmod(fs, path.c_str(), 07777));
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsChmod(fs, path.c_str(), -1));
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsChown(fs, nullptr, "foo", "bar"));
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
//Permission denied
HdfsHandle connection2 = cluster.connect_c("OtherGuy");
hdfsFS fs2 = connection2.handle();
EXPECT_EQ(-1, hdfsChmod(fs2, path.c_str(), 0123));
EXPECT_EQ((int ) std::errc::permission_denied, errno);
EXPECT_EQ(-1, hdfsChown(fs2, path.c_str(), "cool", "nice"));
EXPECT_EQ((int ) std::errc::permission_denied, errno);
//Verify Chmod and Chown worked
EXPECT_EQ(0, hdfsChmod(fs, path.c_str(), 0123));
EXPECT_EQ(0, hdfsChown(fs, path.c_str(), "cool", "nice"));
hdfsFileInfo *file_info;
EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, path.c_str()));
EXPECT_EQ(0123, file_info->mPermissions);
EXPECT_STREQ("cool", file_info->mOwner);
EXPECT_STREQ("nice", file_info->mGroup);
hdfsFreeFileInfo(file_info, 1);
}
}
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.

View File

@ -24,17 +24,6 @@
#include <stdlib.h>
#include <string.h>
/* Cheat for now and use the same hdfsBuilder as libhdfs */
/* (libhdfspp doesn't have an hdfsBuilder yet). */
struct hdfsBuilder {
int forceNewInstance;
const char *nn;
tPort port;
const char *kerbTicketCachePath;
const char *userName;
struct hdfsBuilderConfOpt *opts;
};
/* Shim structs and functions that delegate to libhdfspp and libhdfs. */
struct hdfs_internal {
libhdfs_hdfsFS libhdfsRep;
@ -48,6 +37,11 @@ struct hdfsFile_internal {
};
typedef struct hdfsFile_internal* hdfsFile;
struct hdfsBuilder {
struct hdfsBuilder * libhdfs_builder;
struct hdfsBuilder * libhdfspp_builder;
};
#define REPORT_FUNCTION_NOT_IMPLEMENTED \
fprintf(stderr, "%s failed: function not implemented by " \
"libhdfs++ test shim", __PRETTY_FUNCTION__);
@ -78,74 +72,139 @@ void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
return (hdfsFS) libhdfspp_hdfsConnectAsUser(nn, port, user);
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
ret->libhdfsRep = libhdfs_hdfsConnectAsUser(nn, port, user);
if (!ret->libhdfsRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
ret->libhdfsppRep = libhdfspp_hdfsConnectAsUser(nn, port, user);
if (!ret->libhdfsppRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
return ret;
}
hdfsFS hdfsConnect(const char* nn, tPort port) {
REPORT_FUNCTION_NOT_IMPLEMENTED
return NULL;
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
ret->libhdfsRep = libhdfs_hdfsConnect(nn, port);
if (!ret->libhdfsRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
ret->libhdfsppRep = libhdfspp_hdfsConnect(nn, port);
if (!ret->libhdfsppRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
return ret;
}
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) {
REPORT_FUNCTION_NOT_IMPLEMENTED
return NULL;
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
ret->libhdfsRep = libhdfs_hdfsConnectAsUserNewInstance(nn, port, user);
if (!ret->libhdfsRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
ret->libhdfsppRep = libhdfspp_hdfsConnectAsUserNewInstance(nn, port, user);
if (!ret->libhdfsppRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
return ret;
}
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
REPORT_FUNCTION_NOT_IMPLEMENTED
return NULL;
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
ret->libhdfsRep = libhdfs_hdfsConnectNewInstance(nn, port);
if (!ret->libhdfsRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
ret->libhdfsppRep = libhdfspp_hdfsConnectNewInstance(nn, port);
if (!ret->libhdfsppRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
return ret;
}
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
ret->libhdfsppRep = libhdfspp_hdfsConnect(bld->nn, bld->port);
if (!ret->libhdfsppRep) {
ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld->libhdfs_builder);
if (!ret->libhdfsRep) {
free(ret);
ret = NULL;
} else {
/* Destroys bld object. */
ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld);
if (!ret->libhdfsRep) {
libhdfspp_hdfsDisconnect(ret->libhdfsppRep);
free(ret);
ret = NULL;
}
return NULL;
}
/* Destroys bld object. */
ret->libhdfsppRep = libhdfspp_hdfsBuilderConnect(bld->libhdfspp_builder);
if (!ret->libhdfsppRep) {
libhdfs_hdfsDisconnect(ret->libhdfsRep);
free(ret);
return NULL;
}
return ret;
}
struct hdfsBuilder *hdfsNewBuilder(void) {
return libhdfs_hdfsNewBuilder();
struct hdfsBuilder * result = calloc(1, sizeof(struct hdfsBuilder));
result -> libhdfs_builder = libhdfs_hdfsNewBuilder();
result -> libhdfspp_builder = libhdfspp_hdfsNewBuilder();
return result;
}
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
libhdfs_hdfsBuilderSetForceNewInstance(bld);
libhdfs_hdfsBuilderSetForceNewInstance(bld->libhdfs_builder);
libhdfspp_hdfsBuilderSetForceNewInstance(bld->libhdfspp_builder);
}
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) {
libhdfs_hdfsBuilderSetNameNode(bld, nn);
libhdfs_hdfsBuilderSetNameNode(bld->libhdfs_builder, nn);
libhdfspp_hdfsBuilderSetNameNode(bld->libhdfspp_builder, nn);
}
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) {
libhdfs_hdfsBuilderSetNameNodePort(bld, port);
libhdfs_hdfsBuilderSetNameNodePort(bld->libhdfs_builder, port);
libhdfspp_hdfsBuilderSetNameNodePort(bld->libhdfspp_builder, port);
}
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) {
libhdfs_hdfsBuilderSetUserName(bld, userName);
libhdfs_hdfsBuilderSetUserName(bld->libhdfs_builder, userName);
libhdfspp_hdfsBuilderSetUserName(bld->libhdfspp_builder, userName);
}
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
const char *kerbTicketCachePath) {
libhdfs_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
REPORT_FUNCTION_NOT_IMPLEMENTED
}
void hdfsFreeBuilder(struct hdfsBuilder *bld) {
libhdfs_hdfsFreeBuilder(bld);
libhdfs_hdfsFreeBuilder(bld->libhdfs_builder);
libhdfspp_hdfsFreeBuilder(bld->libhdfspp_builder);
free(bld);
}
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
const char *val) {
return libhdfs_hdfsBuilderConfSetStr(bld, key, val);
int ret = libhdfs_hdfsBuilderConfSetStr(bld->libhdfs_builder, key, val);
if (ret) {
return ret;
}
ret = libhdfspp_hdfsBuilderConfSetStr(bld->libhdfspp_builder, key, val);
if (ret) {
return ret;
}
return 0;
}
int hdfsConfGetStr(const char *key, char **val) {
@ -161,11 +220,16 @@ void hdfsConfStrFree(char *val) {
}
int hdfsDisconnect(hdfsFS fs) {
int ret;
libhdfspp_hdfsDisconnect(fs->libhdfsppRep);
ret = libhdfs_hdfsDisconnect(fs->libhdfsRep);
int ret1 = libhdfs_hdfsDisconnect(fs->libhdfsRep);
int ret2 = libhdfspp_hdfsDisconnect(fs->libhdfsppRep);
free(fs);
return ret;
if (ret1){
return ret1;
} else if (ret2){
return ret2;
} else {
return 0;
}
}
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
@ -337,11 +401,11 @@ tOffset hdfsGetUsed(hdfsFS fs) {
int hdfsChown(hdfsFS fs, const char* path, const char *owner,
const char *group) {
return libhdfs_hdfsChown(fs->libhdfsRep, path, owner, group);
return libhdfspp_hdfsChown(fs->libhdfsppRep, path, owner, group);
}
int hdfsChmod(hdfsFS fs, const char* path, short mode) {
return libhdfs_hdfsChmod(fs->libhdfsRep, path, mode);
return libhdfspp_hdfsChmod(fs->libhdfsppRep, path, mode);
}
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {