From 2a8edd4e523a2e2e90ee570d0c61a600cf7bf610 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 10 Aug 2016 13:27:31 -0400 Subject: [PATCH] HDFS-9271: libhdfs++: Implement basic NN operations. Contributed by Anatoli Shein. --- .../libhdfs-tests/test_libhdfs_threaded.c | 9 +- .../libhdfspp/include/hdfspp/hdfs_ext.h | 14 + .../native/libhdfspp/include/hdfspp/hdfspp.h | 57 ++- .../native/libhdfspp/include/hdfspp/options.h | 7 + .../native/libhdfspp/include/hdfspp/status.h | 4 + .../native/libhdfspp/lib/bindings/c/hdfs.cc | 466 ++++++++++++++++-- .../lib/common/hdfs_configuration.cc | 1 + .../libhdfspp/lib/common/hdfs_configuration.h | 1 + .../native/libhdfspp/lib/common/options.cc | 4 +- .../native/libhdfspp/lib/common/status.cc | 4 + .../main/native/libhdfspp/lib/common/util.cc | 9 + .../main/native/libhdfspp/lib/common/util.h | 2 + .../native/libhdfspp/lib/fs/filehandle.cc | 7 +- .../main/native/libhdfspp/lib/fs/filehandle.h | 7 + .../native/libhdfspp/lib/fs/filesystem.cc | 153 +++++- .../main/native/libhdfspp/lib/fs/filesystem.h | 24 +- .../libhdfspp/lib/fs/namenode_operations.cc | 156 +++++- .../libhdfspp/lib/fs/namenode_operations.h | 21 +- .../native/libhdfspp/tests/hdfs_ext_test.cc | 267 +++++++++- .../main/native/libhdfspp/tests/hdfs_shim.c | 75 ++- 20 files changed, 1157 insertions(+), 131 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c index 1ab3700715..343e05a64b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c @@ -93,13 +93,12 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path) blockSize = hdfsGetDefaultBlockSize(fs); if (blockSize < 0) { - ret = errno; - fprintf(stderr, "hdfsGetDefaultBlockSize failed with error %d\n", ret); - return ret; + fprintf(stderr, "hdfsGetDefaultBlockSize failed with error %d\n", errno); + return -1; } else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) { fprintf(stderr, "hdfsGetDefaultBlockSize got %"PRId64", but we " "expected %d\n", blockSize, TLH_DEFAULT_BLOCK_SIZE); - return EIO; + return -1; } blockSize = hdfsGetDefaultBlockSizeAtPath(fs, path); @@ -205,6 +204,8 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs, EXPECT_ZERO(hdfsHSync(fs, file)); EXPECT_ZERO(hdfsCloseFile(fs, file)); + EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->file1)); + /* There should be 1 entry in the directory. */ hdfsFileInfo * dirList = hdfsListDirectory(fs, paths->prefix, &numEntries); EXPECT_NONNULL(dirList); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index 6ec3a4b582..ce9f0f5034 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -115,6 +115,20 @@ int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key, LIBHDFS_EXTERNAL int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val); + +/** + * Get a configuration long from the settings currently read into the builder. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; -1 otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val); + struct hdfsDNInfo { const char * ip_address; const char * hostname; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 20a651a0dc..46fe8e9bb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -124,6 +124,12 @@ public: */ virtual void SetFileEventCallback(file_event_callback callback) = 0; + /* how many bytes have been successfully read */ + virtual uint64_t get_bytes_read() = 0; + + /* resets the number of bytes read to zero */ + virtual void clear_bytes_read() = 0; + virtual ~FileHandle(); }; @@ -171,6 +177,41 @@ class FileSystem { const std::function &handler) = 0; virtual Status Open(const std::string &path, FileHandle **handle) = 0; + /** + * Get the block size for the given file. + * @param path The path to the file + */ + virtual void GetPreferredBlockSize(const std::string &path, + const std::function &handler) = 0; + virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) = 0; + + /** + * Set replication for an existing file. + *

+ * The NameNode sets replication to the new value and returns. + * The actual block replication is not expected to be performed during + * this method call. The blocks will be populated or removed in the + * background as the result of the routine block maintenance procedures. + * + * @param src file name + * @param replication new replication + */ + virtual void SetReplication(const std::string & path, int16_t replication, std::function handler) = 0; + virtual Status SetReplication(const std::string & path, int16_t replication) = 0; + + /** + * Sets the modification and access time of the file to the specified time. + * @param src The string representation of the path + * @param mtime The number of milliseconds since Jan 1, 1970. + * Setting mtime to -1 means that modification time should not + * be set by this call. + * @param atime The number of milliseconds since Jan 1, 1970. + * Setting atime to -1 means that access time should not be set + * by this call. + */ + virtual void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function handler) = 0; + virtual Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) = 0; + /** * Returns metadata about the file if the file/directory exists. **/ @@ -209,12 +250,12 @@ class FileSystem { std::shared_ptr> & stat_infos) = 0; /** - * Returns the locations of all known blocks for the indicated file, or an error + * Returns the locations of all known blocks for the indicated file (or part of it), or an error * if the information clould not be found */ - virtual void GetBlockLocations(const std::string & path, + virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, const std::function locations)> ) = 0; - virtual Status GetBlockLocations(const std::string & path, + virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::shared_ptr * locations) = 0; /** @@ -224,9 +265,9 @@ class FileSystem { * @param permissions Permissions for the new directory (negative value for the default permissions) * @param createparent Create parent directories if they do not exist (may not be empty) */ - virtual void Mkdirs(const std::string & path, long permissions, bool createparent, + virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, std::function handler) = 0; - virtual Status Mkdirs(const std::string & path, long permissions, bool createparent) = 0; + virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) = 0; /** * Delete the given file or directory from the file system. @@ -257,8 +298,8 @@ class FileSystem { * @param permissions the bitmask to set it to (should be between 0 and 01777) */ virtual void SetPermission(const std::string & path, - short permissions, const std::function &handler) = 0; - virtual Status SetPermission(const std::string & path, short permissions) = 0; + uint16_t permissions, const std::function &handler) = 0; + virtual Status SetPermission(const std::string & path, uint16_t permissions) = 0; /** * Set Owner of a path (i.e. a file or a directory). @@ -335,6 +376,8 @@ class FileSystem { * @param callback The function to call when a reporting event occurs. */ virtual void SetFsEventCallback(fs_event_callback callback) = 0; + + virtual Options get_options() = 0; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h index 1acfe1a94b..55093d05a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h @@ -116,6 +116,13 @@ struct Options { Authentication authentication; static const Authentication kDefaultAuthentication = kSimple; + /** + * Block size in bytes. + * Default: 128 * 1024 * 1024 = 134217728 + **/ + long block_size; + static const long kDefaultBlockSize = 128*1024*1024; + Options(); }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h index 0187786f66..ffbb2bfe4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h @@ -46,12 +46,16 @@ class Status { static Status Canceled(); static Status PathNotFound(const char *msg); static Status InvalidOffset(const char *msg); + static Status PathIsNotDirectory(const char *msg); // success bool ok() const { return code_ == 0; } bool is_invalid_offset() const { return code_ == kInvalidOffset; } + // contains ENOENT error + bool pathNotFound() const { return code_ == kPathNotFound; } + // Returns the string "OK" for success. std::string ToString() const; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index be57a7e79c..7fda4e245f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -43,15 +43,18 @@ static constexpr tPort kDefaultPort = 8020; /* Separate the handles used by the C api from the C++ API*/ struct hdfs_internal { - hdfs_internal(FileSystem *p) : filesystem_(p) {} + hdfs_internal(FileSystem *p) : filesystem_(p), working_directory("/") {} hdfs_internal(std::unique_ptr p) - : filesystem_(std::move(p)) {} + : filesystem_(std::move(p)), working_directory("/") {} virtual ~hdfs_internal(){}; FileSystem *get_impl() { return filesystem_.get(); } const FileSystem *get_impl() const { return filesystem_.get(); } + std::string get_working_directory() { return working_directory; } + void set_working_directory(std::string new_directory) { working_directory = new_directory; } private: std::unique_ptr filesystem_; + std::string working_directory; //has to always start and end with '/' }; struct hdfsFile_internal { @@ -198,6 +201,7 @@ static int ReportCaughtNonException() return Error(Status::Exception("Uncaught value not derived from std::exception", "")); } +/* return false on failure */ bool CheckSystem(hdfsFS fs) { if (!fs) { ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); @@ -208,10 +212,7 @@ bool CheckSystem(hdfsFS fs) { } /* return false on failure */ -bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { - if (!CheckSystem(fs)) - return false; - +bool CheckHandle(hdfsFile file) { if (!file) { ReportError(EBADF, "Cannot perform FS operations with null File handle."); return false; @@ -219,16 +220,60 @@ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { return true; } +/* return false on failure */ +bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { + if (!CheckSystem(fs)) + return false; + + if (!CheckHandle(file)) + return false; + + return true; +} + +optional getAbsolutePath(hdfsFS fs, const char* path) { + //Does not support . (dot) and .. (double dot) semantics + if (!path || path[0] == '\0') { + Error(Status::InvalidArgument("getAbsolutePath: argument 'path' cannot be NULL or empty")); + return optional(); + } + if (path[0] != '/') { + //we know that working directory always ends with '/' + return fs->get_working_directory().append(path); + } + return optional(path); +} + /** * C API implementations **/ int hdfsFileIsOpenForRead(hdfsFile file) { /* files can only be open for reads at the moment, do a quick check */ - if (file) { - return 1; // Update implementation when we get file writing + if (!CheckHandle(file)){ + return 0; + } + return 1; // Update implementation when we get file writing +} + +int hdfsFileIsOpenForWrite(hdfsFile file) { + /* files can only be open for reads at the moment, so return false */ + CheckHandle(file); + return -1; // Update implementation when we get file writing +} + +int hdfsConfGetLong(const char *key, int64_t *val) +{ + try + { + errno = 0; + hdfsBuilder builder; + return hdfsBuilderConfGetLong(&builder, key, val); + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); } - return 0; } hdfsFS doHdfsConnect(optional nn, optional port, optional user, const Options & options) { @@ -329,8 +374,12 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); return nullptr; } + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return nullptr; + } FileHandle *f = nullptr; - Status stat = fs->get_impl()->Open(path, &f); + Status stat = fs->get_impl()->Open(*abs_path, &f); if (!stat.ok()) { Error(stat); return nullptr; @@ -364,11 +413,165 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) { } } +char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { + try + { + errno = 0; + if (!CheckSystem(fs)) { + return nullptr; + } + std::string wd = fs->get_working_directory(); + size_t size = wd.size(); + if (size + 1 > bufferSize) { + std::stringstream ss; + ss << "hdfsGetWorkingDirectory: bufferSize is " << bufferSize << + ", which is not enough to fit working directory of size " << (size + 1); + Error(Status::InvalidArgument(ss.str().c_str())); + return nullptr; + } + wd.copy(buffer, size); + buffer[size] = '\0'; + return buffer; + } catch (const std::exception & e) { + ReportException(e); + return nullptr; + } catch (...) { + ReportCaughtNonException(); + return nullptr; + } +} + +int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { + try + { + errno = 0; + if (!CheckSystem(fs)) { + return -1; + } + optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; + } + //Enforce last character to be '/' + std::string withSlash = *abs_path; + char last = withSlash.back(); + if (last != '/'){ + withSlash += '/'; + } + fs->set_working_directory(withSlash); + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + +int hdfsAvailable(hdfsFS fs, hdfsFile file) { + //Since we do not have read ahead implemented, return 0 if fs and file are good; + errno = 0; + if (!CheckSystemAndHandle(fs, file)) { + return -1; + } + return 0; +} + +tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { + try { + errno = 0; + return fs->get_impl()->get_options().block_size; + } catch (const std::exception & e) { + ReportException(e); + return -1; + } catch (...) { + ReportCaughtNonException(); + return -1; + } +} + +tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { + try { + errno = 0; + if (!CheckSystem(fs)) { + return -1; + } + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; + } + uint64_t block_size; + Status stat = fs->get_impl()->GetPreferredBlockSize(*abs_path, block_size); + if (!stat.ok()) { + if (stat.pathNotFound()){ + return fs->get_impl()->get_options().block_size; + } else { + return Error(stat); + } + } + return block_size; + } catch (const std::exception & e) { + ReportException(e); + return -1; + } catch (...) { + ReportCaughtNonException(); + return -1; + } +} + +int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { + try { + errno = 0; + if (!CheckSystem(fs)) { + return -1; + } + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; + } + if(replication < 1){ + return Error(Status::InvalidArgument("SetReplication: argument 'replication' cannot be less than 1")); + } + Status stat; + stat = fs->get_impl()->SetReplication(*abs_path, replication); + if (!stat.ok()) { + return Error(stat); + } + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + +int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { + try { + errno = 0; + if (!CheckSystem(fs)) { + return -1; + } + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; + } + Status stat; + stat = fs->get_impl()->SetTimes(*abs_path, mtime, atime); + if (!stat.ok()) { + return Error(stat); + } + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + tOffset hdfsGetCapacity(hdfsFS fs) { try { errno = 0; if (!CheckSystem(fs)) { - return -1; + return -1; } hdfs::FsInfo fs_info; @@ -391,7 +594,7 @@ tOffset hdfsGetUsed(hdfsFS fs) { try { errno = 0; if (!CheckSystem(fs)) { - return -1; + return -1; } hdfs::FsInfo fs_info; @@ -459,15 +662,41 @@ void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info, file_info->mLastAccess = stat_info.access_time; } +int hdfsExists(hdfsFS fs, const char *path) { + try { + errno = 0; + if (!CheckSystem(fs)) { + return -1; + } + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; + } + hdfs::StatInfo stat_info; + Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info); + if (!stat.ok()) { + return Error(stat); + } + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { try { errno = 0; if (!CheckSystem(fs)) { return nullptr; } - + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return nullptr; + } hdfs::StatInfo stat_info; - Status stat = fs->get_impl()->GetFileInfo(path, stat_info); + Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info); if (!stat.ok()) { Error(stat); return nullptr; @@ -491,9 +720,12 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { *numEntries = 0; return nullptr; } - + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return nullptr; + } std::shared_ptr> stat_infos; - Status stat = fs->get_impl()->GetListing(path, stat_infos); + Status stat = fs->get_impl()->GetListing(*abs_path, stat_infos); if (!stat.ok()) { Error(stat); *numEntries = 0; @@ -540,12 +772,13 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) { if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsCreateDirectory: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } Status stat; - //-1 for default permissions and true for creating all non-existant parent directories - stat = fs->get_impl()->Mkdirs(path, -1, true); + //Use default permissions and set true for creating all non-existant parent directories + stat = fs->get_impl()->Mkdirs(*abs_path, NameNodeOperations::GetDefaultPermissionMask(), true); if (!stat.ok()) { return Error(stat); } @@ -563,11 +796,12 @@ int hdfsDelete(hdfsFS fs, const char* path, int recursive) { if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsDelete: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } Status stat; - stat = fs->get_impl()->Delete(path, recursive); + stat = fs->get_impl()->Delete(*abs_path, recursive); if (!stat.ok()) { return Error(stat); } @@ -585,14 +819,13 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { if (!CheckSystem(fs)) { return -1; } - if (!oldPath) { - return Error(Status::InvalidArgument("hdfsRename: argument 'oldPath' cannot be NULL")); - } - if (!newPath) { - return Error(Status::InvalidArgument("hdfsRename: argument 'newPath' cannot be NULL")); + const optional old_abs_path = getAbsolutePath(fs, oldPath); + const optional new_abs_path = getAbsolutePath(fs, newPath); + if(!old_abs_path || !new_abs_path) { + return -1; } Status stat; - stat = fs->get_impl()->Rename(oldPath, newPath); + stat = fs->get_impl()->Rename(*old_abs_path, *new_abs_path); if (!stat.ok()) { return Error(stat); } @@ -610,14 +843,15 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){ if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsChmod: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } Status stat = NameNodeOperations::CheckValidPermissionMask(mode); if (!stat.ok()) { return Error(stat); } - stat = fs->get_impl()->SetPermission(path, mode); + stat = fs->get_impl()->SetPermission(*abs_path, mode); if (!stat.ok()) { return Error(stat); } @@ -635,14 +869,15 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsChown: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } std::string own = (owner) ? owner : ""; std::string grp = (group) ? group : ""; Status stat; - stat = fs->get_impl()->SetOwner(path, own, grp); + stat = fs->get_impl()->SetOwner(*abs_path, own, grp); if (!stat.ok()) { return Error(stat); } @@ -660,14 +895,15 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsCreateSnapshot: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } Status stat; if(!name){ - stat = fs->get_impl()->CreateSnapshot(path, ""); + stat = fs->get_impl()->CreateSnapshot(*abs_path, ""); } else { - stat = fs->get_impl()->CreateSnapshot(path, name); + stat = fs->get_impl()->CreateSnapshot(*abs_path, name); } if (!stat.ok()) { return Error(stat); @@ -686,14 +922,15 @@ int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) { if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } if (!name) { return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'name' cannot be NULL")); } Status stat; - stat = fs->get_impl()->DeleteSnapshot(path, name); + stat = fs->get_impl()->DeleteSnapshot(*abs_path, name); if (!stat.ok()) { return Error(stat); } @@ -711,11 +948,12 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path) { if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsAllowSnapshot: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } Status stat; - stat = fs->get_impl()->AllowSnapshot(path); + stat = fs->get_impl()->AllowSnapshot(*abs_path); if (!stat.ok()) { return Error(stat); } @@ -733,11 +971,12 @@ int hdfsDisallowSnapshot(hdfsFS fs, const char* path) { if (!CheckSystem(fs)) { return -1; } - if (!path) { - return Error(Status::InvalidArgument("hdfsDisallowSnapshot: argument 'path' cannot be NULL")); + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; } Status stat; - stat = fs->get_impl()->DisallowSnapshot(path); + stat = fs->get_impl()->DisallowSnapshot(*abs_path); if (!stat.ok()) { return Error(stat); } @@ -793,6 +1032,55 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { } } +int hdfsUnbufferFile(hdfsFile file) { + //Currently we are not doing any buffering + CheckHandle(file); + return -1; +} + +int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) { + try + { + errno = 0; + if (!CheckHandle(file)) { + return -1; + } + *stats = new hdfsReadStatistics; + memset(*stats, 0, sizeof(hdfsReadStatistics)); + (*stats)->totalBytesRead = file->get_impl()->get_bytes_read(); + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + +int hdfsFileClearReadStatistics(hdfsFile file) { + try + { + errno = 0; + if (!CheckHandle(file)) { + return -1; + } + file->get_impl()->clear_bytes_read(); + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + +int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) { + return stats->totalBytesRead - stats->totalLocalBytesRead; +} + +void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { + errno = 0; + delete stats; +} + /* 0 on success, -1 on error*/ int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { try @@ -868,9 +1156,12 @@ int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations"); return -1; } - + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return -1; + } std::shared_ptr ppLocations; - Status stat = fs->get_impl()->GetBlockLocations(path, &ppLocations); + Status stat = fs->get_impl()->GetBlockLocations(*abs_path, 0, std::numeric_limits::max(), &ppLocations); if (!stat.ok()) { return Error(stat); } @@ -943,6 +1234,59 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) { return 0; } +char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { + try + { + errno = 0; + if (!CheckSystem(fs)) { + return nullptr; + } + const optional abs_path = getAbsolutePath(fs, path); + if(!abs_path) { + return nullptr; + } + std::shared_ptr ppLocations; + Status stat = fs->get_impl()->GetBlockLocations(*abs_path, start, length, &ppLocations); + if (!stat.ok()) { + Error(stat); + return nullptr; + } + const std::vector & ppBlockLocations = ppLocations->getBlockLocations(); + char ***hosts = new char**[ppBlockLocations.size() + 1]; + for (size_t i=0; i < ppBlockLocations.size(); i++) { + const std::vector & ppDNInfos = ppBlockLocations[i].getDataNodes(); + hosts[i] = new char*[ppDNInfos.size() + 1]; + for (size_t j=0; j < ppDNInfos.size(); j++) { + auto ppDNInfo = ppDNInfos[j]; + hosts[i][j] = new char[ppDNInfo.getHostname().size() + 1]; + strncpy(hosts[i][j], ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size() + 1); + } + hosts[i][ppDNInfos.size()] = nullptr; + } + hosts[ppBlockLocations.size()] = nullptr; + return hosts; + } catch (const std::exception & e) { + ReportException(e); + return nullptr; + } catch (...) { + ReportCaughtNonException(); + return nullptr; + } +} + +void hdfsFreeHosts(char ***blockHosts) { + errno = 0; + if (blockHosts == nullptr) + return; + + for (size_t i = 0; blockHosts[i]; i++) { + for (size_t j = 0; blockHosts[i][j]; j++) { + delete[] blockHosts[i][j]; + } + delete[] blockHosts[i]; + } + delete blockHosts; +} /******************************************************************* * EVENT CALLBACKS @@ -1234,6 +1578,28 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val } } +int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val) +{ + try + { + errno = 0; + // Pull from default configuration + optional value = bld->config.GetInt(key); + if (value) + { + *val = *value; + return 0; + } + // If not found, don't change val + ReportError(EINVAL, "Could not get Builder value"); + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + /** * Logging functions **/ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc index 70775b8a2a..6778bad783 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc @@ -144,6 +144,7 @@ Options HdfsConfiguration::GetOptions() { OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey)); OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey)); OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey)); + OptionalSet(result.block_size, GetInt(kDfsBlockSizeKey)); OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h index 459364f8af..d6f902ebe9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h @@ -45,6 +45,7 @@ class HdfsConfiguration : public Configuration { static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication"; static constexpr const char * kHadoopSecurityAuthentication_simple = "simple"; static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos"; + static constexpr const char * kDfsBlockSizeKey = "dfs.blocksize"; static constexpr const char * kDfsClientFailoverMaxAttempts = "dfs.client.failover.max.attempts"; static constexpr const char * kDfsClientFailoverConnectionRetriesOnTimeouts = "dfs.client.failover.connection.retries.on.timeouts"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc index 29b45b24e1..a889be58c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -28,6 +28,7 @@ const int Options::kDefaultRpcRetryDelayMs; const unsigned int Options::kDefaultHostExclusionDuration; const unsigned int Options::kDefaultFailoverMaxRetries; const unsigned int Options::kDefaultFailoverConnectionMaxRetries; +const long Options::kDefaultBlockSize; Options::Options() : rpc_timeout(kDefaultRpcTimeout), rpc_connect_timeout(kDefaultRpcConnectTimeout), @@ -37,7 +38,8 @@ Options::Options() : rpc_timeout(kDefaultRpcTimeout), defaultFS(), failover_max_retries(kDefaultFailoverMaxRetries), failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries), - authentication(kDefaultAuthentication) {} + authentication(kDefaultAuthentication), + block_size(kDefaultBlockSize) {} std::string NamenodeInfo::get_host() const { return uri.get_host(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc index b351900c75..540f8c17c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -89,6 +89,10 @@ Status Status::ResourceUnavailable(const char *msg) { return Status(kResourceUnavailable, msg); } +Status Status::PathIsNotDirectory(const char *msg) { + return Status(kNotADirectory, msg); +} + Status Status::Unimplemented() { return Status(kUnimplemented, ""); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index 7bb1e30080..853ada867f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -107,6 +107,15 @@ std::string SafeDisconnect(asio::ip::tcp::socket *sock) { return err; } +bool IsHighBitSet(uint64_t num) { + uint64_t firstBit = (uint64_t) 1 << 63; + if (num & firstBit) { + return true; + } else { + return false; + } +} + } void ShutdownProtobufLibrary_C() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index da63435c08..7f0e572188 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -110,6 +110,8 @@ inline asio::ip::tcp::socket *get_asio_socket_ptr return s; } +//Check if the high bit is set +bool IsHighBitSet(uint64_t num); } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index f40b81c886..260094417c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -41,7 +41,7 @@ FileHandleImpl::FileHandleImpl(const std::string & cluster_name, std::shared_ptr bad_data_nodes, std::shared_ptr event_handlers) : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), - bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers) { + bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) { LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" << FMT_THIS_ADDR << ", ...) called"); @@ -68,6 +68,7 @@ void FileHandleImpl::PositionRead( bad_node_tracker_->AddBadNode(contacted_datanode); } + bytes_read_ += bytes_read; handler(status, bytes_read); }; @@ -352,4 +353,8 @@ bool FileHandle::ShouldExclude(const Status &s) { } } +uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_; } + +void FileHandleImpl::clear_bytes_read() { bytes_read_ = 0; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 38c1fec710..03c55fff18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -113,6 +113,12 @@ public: **/ std::shared_ptr get_event_handlers(); + /* how many bytes have been successfully read */ + virtual uint64_t get_bytes_read() override; + + /* resets the number of bytes read to zero */ + virtual void clear_bytes_read() override; + protected: virtual std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, std::shared_ptr dn, @@ -133,6 +139,7 @@ private: CancelHandle cancel_state_; ReaderGroup readers_; std::shared_ptr event_handlers_; + uint64_t bytes_read_; }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index ae55e0b456..de6ebb7029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -243,14 +243,13 @@ void FileSystemImpl::Open( << FMT_THIS_ADDR << ", path=" << path << ") called"); - nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, std::shared_ptr file_info) { + nn_.GetBlockLocations(path, 0, std::numeric_limits::max(), [this, path, handler](const Status &stat, std::shared_ptr file_info) { if(!stat.ok()) { LOG_INFO(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString()); if(stat.get_server_exception_type() == Status::kStandbyException) { LOG_INFO(kFileSystem, << "Operation not allowed on standby datanode"); } } - handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) : nullptr); }); @@ -326,13 +325,24 @@ BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto return result; } -void FileSystemImpl::GetBlockLocations(const std::string & path, +void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, const std::function locations)> handler) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + //Protobuf gives an error 'Negative value is not supported' + //if the high bit is set in uint64 in GetBlockLocations + if (IsHighBitSet(offset)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr); + return; + } + if (IsHighBitSet(length)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr); + return; + } + auto conversion = [handler](const Status & status, std::shared_ptr fileInfo) { if (status.ok()) { auto result = std::make_shared(); @@ -354,10 +364,10 @@ void FileSystemImpl::GetBlockLocations(const std::string & path, } }; - nn_.GetBlockLocations(path, conversion); + nn_.GetBlockLocations(path, offset, length, conversion); } -Status FileSystemImpl::GetBlockLocations(const std::string & path, +Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::shared_ptr * fileBlockLocations) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" @@ -375,7 +385,7 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path, callstate->set_value(std::make_tuple(s,blockInfo)); }; - GetBlockLocations(path, callback); + GetBlockLocations(path, offset, length, callback); /* wait for async to finish */ auto returnstate = future.get(); @@ -390,6 +400,119 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path, return stat; } +void FileSystemImpl::GetPreferredBlockSize(const std::string &path, + const std::function &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + nn_.GetPreferredBlockSize(path, handler); +} + +Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared>>(); + std::future> future(callstate->get_future()); + + /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const uint64_t & bsize) { + callstate->set_value(std::make_tuple(s, bsize)); + }; + + GetPreferredBlockSize(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + uint64_t size = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + block_size = size; + return stat; +} + +void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); + return; + } + Status replStatus = NameNodeOperations::CheckValidReplication(replication); + if (!replStatus.ok()) { + handler(replStatus); + return; + } + + nn_.SetReplication(path, replication, handler); +} + +Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + auto callstate = std::make_shared>(); + std::future future(callstate->get_future()); + + /* wrap async FileSystem::SetReplication with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetReplication(path, replication, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, + std::function handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty")); + return; + } + + nn_.SetTimes(path, mtime, atime, handler); +} + +Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + auto callstate = std::make_shared>(); + std::future future(callstate->get_future()); + + /* wrap async FileSystem::SetTimes with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetTimes(path, mtime, atime, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + void FileSystemImpl::GetFileInfo( const std::string &path, const std::function &handler) { @@ -543,7 +666,7 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr handler) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << @@ -554,10 +677,16 @@ void FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool cre return; } + Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions); + if (!permStatus.ok()) { + handler(permStatus); + return; + } + nn_.Mkdirs(path, permissions, createparent, handler); } -Status FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool createparent) { +Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ", createparent=" << createparent << ") called"); @@ -653,7 +782,7 @@ Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &new } void FileSystemImpl::SetPermission(const std::string & path, - short permissions, const std::function &handler) { + uint16_t permissions, const std::function &handler) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); @@ -670,7 +799,7 @@ void FileSystemImpl::SetPermission(const std::string & path, nn_.SetPermission(path, permissions, handler); } -Status FileSystemImpl::SetPermission(const std::string & path, short permissions) { +Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); @@ -896,4 +1025,8 @@ std::shared_ptr FileSystemImpl::get_event_handlers() { return event_handlers_; } +Options FileSystemImpl::get_options() { + return options_; +} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 73ad7eba08..75a43f3430 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -65,6 +65,16 @@ public: &handler) override; Status Open(const std::string &path, FileHandle **handle) override; + virtual void GetPreferredBlockSize(const std::string &path, + const std::function &handler) override; + virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) override; + + virtual void SetReplication(const std::string & path, int16_t replication, std::function handler) override; + virtual Status SetReplication(const std::string & path, int16_t replication) override; + + void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function handler) override; + Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) override; + void GetFileInfo( const std::string &path, const std::function &handler) override; @@ -88,14 +98,14 @@ public: Status GetListing(const std::string &path, std::shared_ptr> &stat_infos) override; - virtual void GetBlockLocations(const std::string & path, + virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, const std::function locations)> ) override; - virtual Status GetBlockLocations(const std::string & path, + virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::shared_ptr * locations) override; - virtual void Mkdirs(const std::string & path, long permissions, bool createparent, + virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, std::function handler) override; - virtual Status Mkdirs(const std::string & path, long permissions, bool createparent) override; + virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) override; virtual void Delete(const std::string &path, bool recursive, const std::function &handler) override; @@ -106,8 +116,8 @@ public: virtual Status Rename(const std::string &oldPath, const std::string &newPath) override; virtual void SetPermission(const std::string & path, - short permissions, const std::function &handler) override; - virtual Status SetPermission(const std::string & path, short permissions) override; + uint16_t permissions, const std::function &handler) override; + virtual Status SetPermission(const std::string & path, uint16_t permissions) override; virtual void SetOwner(const std::string & path, const std::string & username, const std::string & groupname, const std::function &handler) override; @@ -166,6 +176,8 @@ public: /* all monitored events will need to lookup handlers */ std::shared_ptr get_event_handlers(); + Options get_options(); + private: const Options options_; const std::string client_name_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc index 8b4c1262bb..27ccb5dee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -39,14 +39,26 @@ namespace hdfs { * NAMENODE OPERATIONS ****************************************************************************/ -Status NameNodeOperations::CheckValidPermissionMask(short permissions) { - if (permissions < 0 || permissions > 01777) { +uint16_t NameNodeOperations::GetDefaultPermissionMask() { + return 0755; +} + +Status NameNodeOperations::CheckValidPermissionMask(uint16_t permissions) { + if (permissions > 01777) { std::stringstream errormsg; - errormsg << "IsValidPermissionMask: argument 'permissions' is " << std::oct + errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct << std::showbase << permissions << " (should be between 0 and 01777)"; - //Avoid copying by binding errormsg.str() to a const reference, which extends its lifetime - const std::string& tmp = errormsg.str(); - return Status::InvalidArgument(tmp.c_str()); + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + +Status NameNodeOperations::CheckValidReplication(uint16_t replication) { + if (replication < 1 || replication > 512) { + std::stringstream errormsg; + errormsg << "CheckValidReplication: argument 'replication' is " + << replication << " (should be between 1 and 512)"; + return Status::InvalidArgument(errormsg.str().c_str()); } return Status::OK(); } @@ -57,7 +69,7 @@ void NameNodeOperations::Connect(const std::string &cluster_name, engine_.Connect(cluster_name, servers, handler); } -void NameNodeOperations::GetBlockLocations(const std::string & path, +void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::function)> handler) { using ::hadoop::hdfs::GetBlockLocationsRequestProto; @@ -71,10 +83,21 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, return; } + //Protobuf gives an error 'Negative value is not supported' + //if the high bit is set in uint64 in GetBlockLocations + if (IsHighBitSet(offset)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr); + return; + } + if (IsHighBitSet(length)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr); + return; + } + GetBlockLocationsRequestProto req; req.set_src(path); - req.set_offset(0); - req.set_length(std::numeric_limits::max()); + req.set_offset(offset); + req.set_length(length); auto resp = std::make_shared(); @@ -104,6 +127,106 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, }); } +void NameNodeOperations::GetPreferredBlockSize(const std::string & path, + std::function handler) +{ + using ::hadoop::hdfs::GetPreferredBlockSizeRequestProto; + using ::hadoop::hdfs::GetPreferredBlockSizeResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("GetPreferredBlockSize: argument 'path' cannot be empty"), -1); + return; + } + + GetPreferredBlockSizeRequestProto req; + req.set_filename(path); + + auto resp = std::make_shared(); + + namenode_.GetPreferredBlockSize(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok() && resp -> has_bsize()) { + uint64_t block_size = resp -> bsize(); + handler(stat, block_size); + } else { + handler(stat, -1); + } + }); +} + +void NameNodeOperations::SetReplication(const std::string & path, int16_t replication, + std::function handler) +{ + using ::hadoop::hdfs::SetReplicationRequestProto; + using ::hadoop::hdfs::SetReplicationResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); + return; + } + Status replStatus = CheckValidReplication(replication); + if (!replStatus.ok()) { + handler(replStatus); + return; + } + SetReplicationRequestProto req; + req.set_src(path); + req.set_replication(replication); + + auto resp = std::make_shared(); + + namenode_.SetReplication(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // Checking resp + if(resp -> has_result() && resp ->result() == 1) { + handler(stat); + } else { + //NameNode does not specify why there is no result, in my testing it was happening when the path is not found + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew); + } + } else { + handler(stat); + } + }); +} + +void NameNodeOperations::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, + std::function handler) +{ + using ::hadoop::hdfs::SetTimesRequestProto; + using ::hadoop::hdfs::SetTimesResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty")); + return; + } + + SetTimesRequestProto req; + req.set_src(path); + req.set_mtime(mtime); + req.set_atime(atime); + + auto resp = std::make_shared(); + + namenode_.SetTimes(&req, resp, [resp, handler, path](const Status &stat) { + handler(stat); + }); +} + + + void NameNodeOperations::GetFileInfo(const std::string & path, std::function handler) { @@ -216,7 +339,7 @@ void NameNodeOperations::GetListing( }); } -void NameNodeOperations::Mkdirs(const std::string & path, long permissions, bool createparent, +void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, std::function handler) { using ::hadoop::hdfs::MkdirsRequestProto; @@ -232,13 +355,14 @@ void NameNodeOperations::Mkdirs(const std::string & path, long permissions, bool } MkdirsRequestProto req; + Status permStatus = CheckValidPermissionMask(permissions); + if (!permStatus.ok()) { + handler(permStatus); + return; + } req.set_src(path); hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked(); - if (permissions < 0) { - perm->set_perm(0755); - } else { - perm->set_perm(permissions); - } + perm->set_perm(permissions); req.set_createparent(createparent); auto resp = std::make_shared(); @@ -336,7 +460,7 @@ void NameNodeOperations::Rename(const std::string & oldPath, const std::string & } void NameNodeOperations::SetPermission(const std::string & path, - short permissions, std::function handler) { + uint16_t permissions, std::function handler) { using ::hadoop::hdfs::SetPermissionRequestProto; using ::hadoop::hdfs::SetPermissionResponseProto; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index 9651570739..3afa2e9a1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -48,15 +48,28 @@ public: engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), namenode_(& engine_), options_(options) {} - static Status CheckValidPermissionMask(short permissions); + static uint16_t GetDefaultPermissionMask(); + + static Status CheckValidPermissionMask(uint16_t permissions); + + static Status CheckValidReplication(uint16_t replication); void Connect(const std::string &cluster_name, const std::vector &servers, std::function &&handler); - void GetBlockLocations(const std::string & path, + void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::function)> handler); + void GetPreferredBlockSize(const std::string & path, + std::function handler); + + void SetReplication(const std::string & path, int16_t replication, + std::function handler); + + void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, + std::function handler); + void GetFileInfo(const std::string & path, std::function handler); @@ -67,7 +80,7 @@ public: std::function>&, bool)> handler, const std::string & start_after = ""); - void Mkdirs(const std::string & path, long permissions, bool createparent, + void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, std::function handler); void Delete(const std::string & path, bool recursive, @@ -76,7 +89,7 @@ public: void Rename(const std::string & oldPath, const std::string & newPath, std::function handler); - void SetPermission(const std::string & path, short permissions, + void SetPermission(const std::string & path, uint16_t permissions, std::function handler); void SetOwner(const std::string & path, const std::string & username, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc index e28e4e9e38..c160f7fb06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc @@ -39,8 +39,8 @@ TEST_F(HdfsExtTest, TestGetBlockLocations) { EXPECT_EQ(0, result); // Test non-extant files - result = hdfsGetBlockLocations(connection, "non_extant_file", &blocks); - EXPECT_NE(0, result); // Should be an error + EXPECT_EQ(-1, hdfsGetBlockLocations(connection, "non_extant_file", &blocks)); // Should be an error + EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno); // Test an extant file std::string filename = connection.newFile(1024); @@ -296,7 +296,6 @@ TEST_F(HdfsExtTest, TestEOF) { HdfsHandle connection = cluster.connect_c(); hdfsFS fs = connection.handle(); EXPECT_NE(nullptr, fs); - //Write to a file errno = 0; int size = 256; @@ -308,28 +307,284 @@ TEST_F(HdfsExtTest, TestEOF) { EXPECT_EQ(size, hdfsWrite(fs, file, buf, size)); free(buf); EXPECT_EQ(0, hdfsCloseFile(fs, file)); - EXPECT_EQ(0, errno); + //libhdfs file operations work, but sometimes sets errno ENOENT : 2 //Test normal reading (no EOF) char buffer[300]; - EXPECT_EQ(0, errno); file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0); EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer))); //Read executes correctly, but causes a warning (captured in HDFS-10595) //and sets errno to EINPROGRESS 115 : Operation now in progress - errno = 0; //Test reading at offset past the EOF EXPECT_EQ(-1, hdfsPread(fs, file, sizeof(buffer), buffer, sizeof(buffer))); EXPECT_EQ(Status::kInvalidOffset, errno); + EXPECT_EQ(0, hdfsCloseFile(fs, file)); +} + + //Testing hdfsExists + TEST_F(HdfsExtTest, TestExists) { + + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + //Path not found + EXPECT_EQ(-1, hdfsExists(fs, "/wrong/dir/")); + EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno); + + //Correct operation + std::string pathDir = "/testExistsDir"; + EXPECT_EQ(0, hdfsCreateDirectory(fs, pathDir.c_str())); + EXPECT_EQ(0, hdfsExists(fs, pathDir.c_str())); + std::string pathFile = connection.newFile(pathDir.c_str(), 1024); + EXPECT_EQ(0, hdfsExists(fs, pathFile.c_str())); + + //Permission denied + EXPECT_EQ(0, hdfsChmod(fs, pathDir.c_str(), 0700)); + HdfsHandle connection2 = cluster.connect_c("OtherGuy"); + hdfsFS fs2 = connection2.handle(); + EXPECT_EQ(-1, hdfsExists(fs2, pathFile.c_str())); + EXPECT_EQ((int ) std::errc::permission_denied, errno); +} + +//Testing Replication and Time modifications +TEST_F(HdfsExtTest, TestReplAndTime) { + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + + std::string path = "/wrong/dir/"; + + //Path not found + EXPECT_EQ(-1, hdfsSetReplication(fs, path.c_str(), 3)); + EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno); + EXPECT_EQ(-1, hdfsUtime(fs, path.c_str(), 1000000, 1000000)); + EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno); + + //Correct operation + path = connection.newFile(1024); + EXPECT_EQ(0, hdfsSetReplication(fs, path.c_str(), 7)); + EXPECT_EQ(0, hdfsUtime(fs, path.c_str(), 123456789, 987654321)); + hdfsFileInfo *file_info; + EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, path.c_str())); + EXPECT_EQ(7, file_info->mReplication); + EXPECT_EQ(123456789, file_info->mLastMod); + EXPECT_EQ(987654321, file_info->mLastAccess); + hdfsFreeFileInfo(file_info, 1); + + //Wrong arguments + EXPECT_EQ(-1, hdfsSetReplication(fs, path.c_str(), 0)); + EXPECT_EQ((int ) std::errc::invalid_argument, errno); + EXPECT_EQ(-1, hdfsSetReplication(fs, path.c_str(), 513)); + EXPECT_EQ((int ) std::errc::invalid_argument, errno); + + //Permission denied + EXPECT_EQ(0, hdfsChmod(fs, path.c_str(), 0700)); + HdfsHandle connection2 = cluster.connect_c("OtherGuy"); + hdfsFS fs2 = connection2.handle(); + EXPECT_EQ(-1, hdfsSetReplication(fs2, path.c_str(), 3)); + EXPECT_EQ((int ) std::errc::permission_denied, errno); + EXPECT_EQ(-1, hdfsUtime(fs2, path.c_str(), 111111111, 222222222)); + EXPECT_EQ((int ) std::errc::permission_denied, errno); +} + +//Testing getting default block size at path +TEST_F(HdfsExtTest, TestDefaultBlockSize) { + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + + //Correct operation (existing path) + std::string path = connection.newFile(1024); + long block_size = hdfsGetDefaultBlockSizeAtPath(fs, path.c_str()); + EXPECT_GT(block_size, 0); + hdfsFileInfo *file_info; + EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, path.c_str())); + EXPECT_EQ(block_size, file_info->mBlockSize); + hdfsFreeFileInfo(file_info, 1); + + //Non-existing path + path = "/wrong/dir/"; + EXPECT_GT(hdfsGetDefaultBlockSizeAtPath(fs, path.c_str()), 0); + + //No path specified + EXPECT_GT(hdfsGetDefaultBlockSize(fs), 0); +} + +//Testing getting hosts +TEST_F(HdfsExtTest, TestHosts) { + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + + char *** hosts = nullptr; + + // Free a null pointer + hdfsFreeHosts(hosts); + EXPECT_EQ(0, errno); + + // Test non-existent files + EXPECT_EQ(nullptr, hdfsGetHosts(fs, "/wrong/file/", 0, std::numeric_limits::max())); + EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno); + + // Test an existent file + std::string filename = connection.newFile(1024); + EXPECT_NE(nullptr, hosts = hdfsGetHosts(fs, filename.c_str(), 0, std::numeric_limits::max())); + + //Make sure there is at least one host + EXPECT_NE(nullptr, *hosts); + EXPECT_NE(nullptr, **hosts); + + hdfsFreeHosts(hosts); + EXPECT_EQ(0, errno); + + //Test invalid arguments + EXPECT_EQ(nullptr, hdfsGetHosts(fs, filename.c_str(), 0, std::numeric_limits::max()+1)); + EXPECT_EQ((int) std::errc::invalid_argument, errno); + + //Test invalid arguments + EXPECT_EQ(nullptr, hdfsGetHosts(fs, filename.c_str(), std::numeric_limits::max()+1, std::numeric_limits::max())); + EXPECT_EQ((int) std::errc::invalid_argument, errno); +} + +//Testing read statistics +TEST_F(HdfsExtTest, TestReadStats) { + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + + struct hdfsReadStatistics *stats; + + //Write to a file + int size = 256; + std::string path = "/readStatTest"; + hdfsFile file = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0); + EXPECT_NE(nullptr, file); + void * buf = malloc(size); + bzero(buf, size); + EXPECT_EQ(size, hdfsWrite(fs, file, buf, size)); + free(buf); + EXPECT_EQ(0, hdfsCloseFile(fs, file)); + + //test before reading + file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0); + EXPECT_EQ(0, hdfsFileGetReadStatistics(file, &stats)); + EXPECT_EQ(0, stats->totalBytesRead); + hdfsFileFreeReadStatistics(stats); + + //test after reading + char buffer[123]; + //Read executes correctly, but causes a warning (captured in HDFS-10595) + EXPECT_EQ(sizeof(buffer), hdfsRead(fs, file, buffer, sizeof(buffer))); + EXPECT_EQ(0, hdfsFileGetReadStatistics(file, &stats)); + EXPECT_EQ(sizeof(buffer), stats->totalBytesRead); + EXPECT_EQ(sizeof(buffer), stats->totalLocalBytesRead); + EXPECT_EQ(0, hdfsReadStatisticsGetRemoteBytesRead(stats)); + hdfsFileFreeReadStatistics(stats); + + //test after clearing + EXPECT_EQ(0, hdfsFileClearReadStatistics(file)); + EXPECT_EQ(0, hdfsFileGetReadStatistics(file, &stats)); + EXPECT_EQ(0, stats->totalBytesRead); + hdfsFileFreeReadStatistics(stats); + EXPECT_EQ(0, hdfsCloseFile(fs, file)); EXPECT_EQ(0, errno); } +//Testing working directory +TEST_F(HdfsExtTest, TestWorkingDirectory) { + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + + //Correct operation of setter and getter + std::string pathDir = "/testWorkDir/"; + EXPECT_EQ(0, hdfsCreateDirectory(fs, pathDir.c_str())); + std::string pathFile = connection.newFile(pathDir.c_str(), 1024); + EXPECT_EQ(0, hdfsSetWorkingDirectory(fs, pathDir.c_str())); + char array[100]; + EXPECT_STREQ(pathDir.c_str(), hdfsGetWorkingDirectory(fs, array, 100)); + + //Get relative path + std::size_t slashPos = pathFile.find_last_of("/"); + std::string fileName = pathFile.substr(slashPos + 1); + + //Testing various functions with relative path: + + //hdfsGetDefaultBlockSizeAtPath + EXPECT_GT(hdfsGetDefaultBlockSizeAtPath(fs, fileName.c_str()), 0); + + //hdfsSetReplication + EXPECT_EQ(0, hdfsSetReplication(fs, fileName.c_str(), 7)); + + //hdfsUtime + EXPECT_EQ(0, hdfsUtime(fs, fileName.c_str(), 123456789, 987654321)); + + //hdfsExists + EXPECT_EQ(0, hdfsExists(fs, fileName.c_str())); + + //hdfsGetPathInfo + hdfsFileInfo *file_info; + EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, fileName.c_str())); + hdfsFreeFileInfo(file_info, 1); + + //hdfsOpenFile + hdfsFile file; + file = hdfsOpenFile(fs, fileName.c_str(), O_RDONLY, 0, 0, 0); + EXPECT_EQ(0, hdfsCloseFile(fs, file)); + + //hdfsCreateDirectory + EXPECT_EQ(0, hdfsCreateDirectory(fs, "newDir")); + + //add another file + std::string fileName2 = connection.newFile(pathDir + "/newDir", 1024); + + //hdfsListDirectory + int numEntries; + hdfsFileInfo * dirList; + EXPECT_NE(nullptr, dirList = hdfsListDirectory(fs, "newDir", &numEntries)); + EXPECT_EQ(1, numEntries); + hdfsFreeFileInfo(dirList, 1); + + //hdfsChmod + EXPECT_EQ(0, hdfsChmod(fs, fileName.c_str(), 0777)); + + //hdfsChown + EXPECT_EQ(0, hdfsChown(fs, fileName.c_str(), "cool", "nice")); + + //hdfsDisallowSnapshot + EXPECT_EQ(0, hdfsDisallowSnapshot(fs, "newDir")); + + //hdfsAllowSnapshot + EXPECT_EQ(0, hdfsAllowSnapshot(fs, "newDir")); + + //hdfsCreateSnapshot + EXPECT_EQ(0, hdfsCreateSnapshot(fs, "newDir", "Some")); + + //hdfsDeleteSnapshot + EXPECT_EQ(0, hdfsDeleteSnapshot(fs, "newDir", "Some")); + + //hdfsGetBlockLocations + hdfsBlockLocations * blocks = nullptr; + EXPECT_EQ(0, hdfsGetBlockLocations(connection, fileName.c_str(), &blocks)); + hdfsFreeBlockLocations(blocks); + + //hdfsGetHosts + char *** hosts; + EXPECT_NE(nullptr, hosts = hdfsGetHosts(fs, fileName.c_str(), 0, std::numeric_limits::max())); + hdfsFreeHosts(hosts); + + //hdfsRename + EXPECT_EQ(0, hdfsRename(fs, fileName.c_str(), "new_file_name")); + + //hdfsDelete + EXPECT_EQ(0, hdfsDelete(fs, "new_file_name", 0)); } +} int main(int argc, char *argv[]) { // The following line must be executed to initialize Google Mock diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index 32f920f9e3..fd82da310a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -50,25 +50,25 @@ int hdfsFileIsOpenForWrite(hdfsFile file) { return libhdfs_hdfsFileIsOpenForWrite(file->libhdfsRep); } -int hdfsFileGetReadStatistics(hdfsFile file, - struct hdfsReadStatistics **stats) { - return libhdfs_hdfsFileGetReadStatistics - (file->libhdfsRep, (struct libhdfs_hdfsReadStatistics **)stats); +int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) { + //We do not track which bytes were remote or local, so we assume all are local + int ret = libhdfspp_hdfsFileGetReadStatistics(file->libhdfsppRep, (struct libhdfspp_hdfsReadStatistics **)stats); + if(!ret) { + (*stats)->totalLocalBytesRead = (*stats)->totalBytesRead; + } + return ret; } -int64_t hdfsReadStatisticsGetRemoteBytesRead( - const struct hdfsReadStatistics *stats) { - return libhdfs_hdfsReadStatisticsGetRemoteBytesRead - ((struct libhdfs_hdfsReadStatistics *)stats); +int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) { + return libhdfspp_hdfsReadStatisticsGetRemoteBytesRead((struct libhdfspp_hdfsReadStatistics *)stats); } int hdfsFileClearReadStatistics(hdfsFile file) { - return libhdfs_hdfsFileClearReadStatistics(file->libhdfsRep); + return libhdfspp_hdfsFileClearReadStatistics(file->libhdfsppRep); } void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { - libhdfs_hdfsFileFreeReadStatistics( - (struct libhdfs_hdfsReadStatistics *)stats); + libhdfspp_hdfsFileFreeReadStatistics((struct libhdfspp_hdfsReadStatistics *)stats); } hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) { @@ -208,15 +208,15 @@ int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, } int hdfsConfGetStr(const char *key, char **val) { - return libhdfs_hdfsConfGetStr(key, val); + return libhdfspp_hdfsConfGetStr(key, val); } int hdfsConfGetInt(const char *key, int32_t *val) { - return libhdfs_hdfsConfGetInt(key, val); + return libhdfspp_hdfsConfGetInt(key, val); } void hdfsConfStrFree(char *val) { - libhdfs_hdfsConfStrFree(val); + libhdfspp_hdfsConfStrFree(val); } int hdfsDisconnect(hdfsFS fs) { @@ -269,15 +269,30 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) { } int hdfsExists(hdfsFS fs, const char *path) { - return libhdfs_hdfsExists(fs->libhdfsRep, path); + return libhdfspp_hdfsExists(fs->libhdfsppRep, path); } int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { - return libhdfs_hdfsSeek(fs->libhdfsRep, file->libhdfsRep, desiredPos); + int ret1 = libhdfs_hdfsSeek(fs->libhdfsRep, file->libhdfsRep, desiredPos); + int ret2 = libhdfspp_hdfsSeek(fs->libhdfsppRep, file->libhdfsppRep, desiredPos); + if (ret1) { + return ret1; + } else if (ret2) { + return ret2; + } else { + return 0; + } } tOffset hdfsTell(hdfsFS fs, hdfsFile file) { - return libhdfs_hdfsTell(fs->libhdfsRep, file->libhdfsRep); + tOffset ret1 = libhdfs_hdfsTell(fs->libhdfsRep, file->libhdfsRep); + tOffset ret2 = libhdfspp_hdfsTell(fs->libhdfsppRep, file->libhdfsppRep); + if (ret1 != ret2) { + errno = EIO; + return -1; + } else { + return ret1; + } } tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { @@ -320,7 +335,7 @@ int hdfsHSync(hdfsFS fs, hdfsFile file) { } int hdfsAvailable(hdfsFS fs, hdfsFile file) { - return libhdfs_hdfsAvailable(fs->libhdfsRep, file->libhdfsRep); + return libhdfspp_hdfsAvailable(fs->libhdfsppRep, file->libhdfsppRep); } int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { @@ -340,11 +355,19 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { } char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { - return libhdfs_hdfsGetWorkingDirectory(fs->libhdfsRep, buffer, bufferSize); + return libhdfspp_hdfsGetWorkingDirectory(fs->libhdfsppRep, buffer, bufferSize); } int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { - return libhdfs_hdfsSetWorkingDirectory(fs->libhdfsRep, path); + int ret1 = libhdfspp_hdfsSetWorkingDirectory(fs->libhdfsppRep, path); + int ret2 = libhdfs_hdfsSetWorkingDirectory(fs->libhdfsRep, path); + if (ret1) { + return ret1; + } else if (ret2) { + return ret2; + } else { + return 0; + } } int hdfsCreateDirectory(hdfsFS fs, const char* path) { @@ -352,7 +375,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) { } int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { - return libhdfs_hdfsSetReplication(fs->libhdfsRep, path, replication); + return libhdfspp_hdfsSetReplication(fs->libhdfsppRep, path, replication); } hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, @@ -376,19 +399,19 @@ int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo) { char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { - return libhdfs_hdfsGetHosts(fs->libhdfsRep, path, start, length); + return libhdfspp_hdfsGetHosts(fs->libhdfsppRep, path, start, length); } void hdfsFreeHosts(char ***blockHosts) { - return libhdfs_hdfsFreeHosts(blockHosts); + return libhdfspp_hdfsFreeHosts(blockHosts); } tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { - return libhdfs_hdfsGetDefaultBlockSize(fs->libhdfsRep); + return libhdfspp_hdfsGetDefaultBlockSize(fs->libhdfsppRep); } tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { - return libhdfs_hdfsGetDefaultBlockSizeAtPath(fs->libhdfsRep, path); + return libhdfspp_hdfsGetDefaultBlockSizeAtPath(fs->libhdfsppRep, path); } tOffset hdfsGetCapacity(hdfsFS fs) { @@ -409,7 +432,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode) { } int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { - return libhdfs_hdfsUtime(fs->libhdfsRep, path, mtime, atime); + return libhdfspp_hdfsUtime(fs->libhdfsppRep, path, mtime, atime); } struct hadoopRzOptions *hadoopRzOptionsAlloc(void) {