HDFS-9368. Implement reads with implicit offset state in libhdfs++. Contributed by James Clampffer.

This commit is contained in:
James 2015-11-24 15:30:08 -05:00 committed by James Clampffer
parent 6f44d92071
commit 64f0e78490
5 changed files with 169 additions and 20 deletions

View File

@ -58,6 +58,43 @@ static void ReportError(int errnum, std::string msg) {
#endif #endif
} }
/* Convert Status wrapped error into appropriate errno and return code */
static int Error(const Status &stat) {
int code = stat.code();
switch (code) {
case Status::Code::kOk:
return 0;
case Status::Code::kInvalidArgument:
ReportError(EINVAL, "Invalid argument");
break;
case Status::Code::kResourceUnavailable:
ReportError(EAGAIN, "Resource temporarily unavailable");
break;
case Status::Code::kUnimplemented:
ReportError(ENOSYS, "Function not implemented");
break;
case Status::Code::kException:
ReportError(EINTR, "Exception raised");
break;
default:
ReportError(ENOSYS, "Error: unrecognised code");
}
return -1;
}
/* return false on failure */
bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return false;
}
if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return false;
}
return true;
}
/** /**
* C API implementations * C API implementations
**/ **/
@ -110,28 +147,66 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
} }
int hdfsCloseFile(hdfsFS fs, hdfsFile file) { int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
if (!fs) { if (!CheckSystemAndHandle(fs, file)) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return -1;
}
if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return -1; return -1;
} }
delete file; delete file;
return 0; return 0;
} }
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length) { tSize length) {
if (!fs) { if (!CheckSystemAndHandle(fs, file)) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return -1;
}
if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return -1; return -1;
} }
return file->get_impl()->Pread(buffer, length, position); size_t len = length;
Status stat = file->get_impl()->Pread(buffer, &len, position);
if (!stat.ok()) {
return Error(stat);
}
return (tSize)len;
}
tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
size_t len = length;
Status stat = file->get_impl()->Read(buffer, &len);
if (!stat.ok()) {
return Error(stat);
}
return (tSize)len;
}
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
off_t desired = desiredPos;
Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
if (!stat.ok()) {
return Error(stat);
}
return (int)desired;
}
tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
ssize_t offset = 0;
Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
if (!stat.ok()) {
return Error(stat);
}
return offset;
} }

View File

@ -35,7 +35,9 @@
namespace hdfs { namespace hdfs {
ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) { FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){};
Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) {
auto stat = std::make_shared<std::promise<Status>>(); auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future()); std::future<Status> future(stat->get_future());
@ -49,7 +51,7 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
contacted_datanode = dn; contacted_datanode = dn;
}; };
input_stream_->PositionRead(buf, nbyte, offset, callback); input_stream_->PositionRead(buf, *nbyte, offset, callback);
/* wait for async to finish */ /* wait for async to finish */
auto s = future.get(); auto s = future.get();
@ -62,9 +64,61 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
impl->bad_node_tracker_->AddBadNode(contacted_datanode); impl->bad_node_tracker_->AddBadNode(contacted_datanode);
} }
return -1; return s;
} }
return (ssize_t)read_count; *nbyte = (size_t)read_count;
return Status::OK();
}
Status FileHandle::Read(void *buf, size_t *nbyte) {
Status stat = Pread(buf, nbyte, offset_);
if (!stat.ok()) {
return stat;
}
offset_ += *nbyte;
return Status::OK();
}
Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) {
off_t new_offset = -1;
switch (whence) {
case std::ios_base::beg:
new_offset = *offset;
break;
case std::ios_base::cur:
new_offset = offset_ + *offset;
break;
case std::ios_base::end:
new_offset = static_cast<InputStreamImpl *>(input_stream_.get())
->get_file_length() +
*offset;
break;
default:
/* unsupported */
return Status::InvalidArgument("Invalid Seek whence argument");
}
if (!CheckSeekBounds(new_offset)) {
return Status::InvalidArgument("Seek offset out of bounds");
}
offset_ = new_offset;
*offset = offset_;
return Status::OK();
}
/* return false if seek will be out of bounds */
bool FileHandle::CheckSeekBounds(ssize_t desired_position) {
ssize_t file_length =
static_cast<InputStreamImpl *>(input_stream_.get())->get_file_length();
if (desired_position < 0 || desired_position >= file_length) {
return false;
}
return true;
} }
bool FileHandle::IsOpenForRead() { bool FileHandle::IsOpenForRead() {

View File

@ -24,6 +24,7 @@
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <chrono> #include <chrono>
#include <iostream>
#include "libhdfspp/hdfs.h" #include "libhdfspp/hdfs.h"
#include "fs/bad_datanode_tracker.h" #include "fs/bad_datanode_tracker.h"
@ -42,14 +43,31 @@ class HadoopFileSystem;
class FileHandle { class FileHandle {
public: public:
virtual ~FileHandle(){}; virtual ~FileHandle(){};
ssize_t Pread(void *buf, size_t nbyte, off_t offset); /**
* Note: The nbyte argument for Read and Pread as well as the
* offset argument for Seek are in/out parameters.
*
* For Read and Pread the value referenced by nbyte should
* be set to the number of bytes to read. Before returning
* the value referenced will be set by the callee to the number
* of bytes that was successfully read.
*
* For Seek the value referenced by offset should be the number
* of bytes to shift from the specified whence position. The
* referenced value will be set to the new offset before returning.
**/
Status Pread(void *buf, size_t *nbyte, off_t offset);
Status Read(void *buf, size_t *nbyte);
Status Seek(off_t *offset, std::ios_base::seekdir whence);
bool IsOpenForRead(); bool IsOpenForRead();
private: private:
/* handle should only be created by fs */ /* handle should only be created by fs */
friend class HadoopFileSystem; friend class HadoopFileSystem;
FileHandle(InputStream *is) : input_stream_(is){}; FileHandle(InputStream *is);
bool CheckSeekBounds(ssize_t desired_position);
std::unique_ptr<InputStream> input_stream_; std::unique_ptr<InputStream> input_stream_;
off_t offset_;
}; };
class HadoopFileSystem { class HadoopFileSystem {

View File

@ -72,7 +72,7 @@ class InputStreamImpl : public InputStream {
const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
const MutableBufferSequence &buffers, const MutableBufferSequence &buffers,
const Handler &handler); const Handler &handler);
uint64_t get_file_length() const;
private: private:
FileSystemImpl *fs_; FileSystemImpl *fs_;
unsigned long long file_length_; unsigned long long file_length_;

View File

@ -43,4 +43,6 @@ void InputStreamImpl::PositionRead(
handler) { handler) {
AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler); AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler);
} }
uint64_t InputStreamImpl::get_file_length() const { return file_length_; }
} }