HDFS-9541. Add hdfsStreamBuilder API to libhdfs to support defaultBlockSizes greater than 2 GB. Contributed by Colin Patrick McCabe.

Change-Id: Ifce1b9be534dc8f7e9d2634cd60e423921b9810f
This commit is contained in:
Zhe Zhang 2016-01-26 11:24:57 -08:00
parent d0d7c22168
commit cf8af7bb45
3 changed files with 187 additions and 3 deletions

View File

@ -838,6 +838,92 @@ static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS,
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags,
int bufferSize, short replication, tSize blockSize) int bufferSize, short replication, tSize blockSize)
{
struct hdfsStreamBuilder *bld = hdfsStreamBuilderAlloc(fs, path, flags);
if (bufferSize != 0) {
hdfsStreamBuilderSetBufferSize(bld, bufferSize);
}
if (replication != 0) {
hdfsStreamBuilderSetReplication(bld, replication);
}
if (blockSize != 0) {
hdfsStreamBuilderSetDefaultBlockSize(bld, blockSize);
}
return hdfsStreamBuilderBuild(bld);
}
struct hdfsStreamBuilder {
hdfsFS fs;
int flags;
int32_t bufferSize;
int16_t replication;
int64_t defaultBlockSize;
char path[1];
};
struct hdfsStreamBuilder *hdfsStreamBuilderAlloc(hdfsFS fs,
const char *path, int flags)
{
int path_len = strlen(path);
struct hdfsStreamBuilder *bld;
// sizeof(hdfsStreamBuilder->path) includes one byte for the string
// terminator
bld = malloc(sizeof(struct hdfsStreamBuilder) + path_len);
if (!bld) {
errno = ENOMEM;
return NULL;
}
bld->fs = fs;
bld->flags = flags;
bld->bufferSize = 0;
bld->replication = 0;
bld->defaultBlockSize = 0;
memcpy(bld->path, path, path_len);
bld->path[path_len] = '\0';
return bld;
}
void hdfsStreamBuilderFree(struct hdfsStreamBuilder *bld)
{
free(bld);
}
int hdfsStreamBuilderSetBufferSize(struct hdfsStreamBuilder *bld,
int32_t bufferSize)
{
if ((bld->flags & O_ACCMODE) != O_WRONLY) {
errno = EINVAL;
return -1;
}
bld->bufferSize = bufferSize;
return 0;
}
int hdfsStreamBuilderSetReplication(struct hdfsStreamBuilder *bld,
int16_t replication)
{
if ((bld->flags & O_ACCMODE) != O_WRONLY) {
errno = EINVAL;
return -1;
}
bld->replication = replication;
return 0;
}
int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
int64_t defaultBlockSize)
{
if ((bld->flags & O_ACCMODE) != O_WRONLY) {
errno = EINVAL;
return -1;
}
bld->defaultBlockSize = defaultBlockSize;
return 0;
}
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
int32_t bufferSize, int16_t replication, int64_t blockSize)
{ {
/* /*
JAVA EQUIVALENT: JAVA EQUIVALENT:
@ -1037,6 +1123,16 @@ done:
return file; return file;
} }
hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld)
{
hdfsFile file = hdfsOpenFileImpl(bld->fs, bld->path, bld->flags,
bld->bufferSize, bld->replication, bld->defaultBlockSize);
int prevErrno = errno;
hdfsStreamBuilderFree(bld);
errno = prevErrno;
return file;
}
int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
{ {
jobject jFS = (jobject)fs; jobject jFS = (jobject)fs;

View File

@ -80,6 +80,7 @@ extern "C" {
kObjectKindFile = 'F', kObjectKindFile = 'F',
kObjectKindDirectory = 'D', kObjectKindDirectory = 'D',
} tObjectKind; } tObjectKind;
struct hdfsStreamBuilder;
/** /**
@ -376,9 +377,11 @@ extern "C" {
LIBHDFS_EXTERNAL LIBHDFS_EXTERNAL
int hdfsDisconnect(hdfsFS fs); int hdfsDisconnect(hdfsFS fs);
/** /**
* hdfsOpenFile - Open a hdfs file in given mode. * hdfsOpenFile - Open a hdfs file in given mode.
* @deprecated Use the hdfsStreamBuilder functions instead.
* This function does not support setting block sizes bigger than 2 GB.
*
* @param fs The configured filesystem handle. * @param fs The configured filesystem handle.
* @param path The full path to the file. * @param path The full path to the file.
* @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), * @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
@ -388,13 +391,95 @@ extern "C" {
* @param replication Block replication - pass 0 if you want to use * @param replication Block replication - pass 0 if you want to use
* the default configured values. * the default configured values.
* @param blocksize Size of block - pass 0 if you want to use the * @param blocksize Size of block - pass 0 if you want to use the
* default configured values. * default configured values. Note that if you want a block size bigger
* than 2 GB, you must use the hdfsStreamBuilder API rather than this
* deprecated function.
* @return Returns the handle to the open file or NULL on error. * @return Returns the handle to the open file or NULL on error.
*/ */
LIBHDFS_EXTERNAL LIBHDFS_EXTERNAL
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize); int bufferSize, short replication, tSize blocksize);
/**
* hdfsStreamBuilderAlloc - Allocate an HDFS stream builder.
*
* @param fs The configured filesystem handle.
* @param path The full path to the file. Will be deep-copied.
* @param flags The open flags, as in hdfsOpenFile.
* @return Returns the hdfsStreamBuilder, or NULL on error.
*/
LIBHDFS_EXTERNAL
struct hdfsStreamBuilder *hdfsStreamBuilderAlloc(hdfsFS fs,
const char *path, int flags);
/**
* hdfsStreamBuilderFree - Free an HDFS file builder.
*
* It is normally not necessary to call this function since
* hdfsStreamBuilderBuild frees the builder.
*
* @param bld The hdfsStreamBuilder to free.
*/
LIBHDFS_EXTERNAL
void hdfsStreamBuilderFree(struct hdfsStreamBuilder *bld);
/**
* hdfsStreamBuilderSetBufferSize - Set the stream buffer size.
*
* @param bld The hdfs stream builder.
* @param bufferSize The buffer size to set.
*
* @return 0 on success, or -1 on error. Errno will be set on error.
*/
LIBHDFS_EXTERNAL
int hdfsStreamBuilderSetBufferSize(struct hdfsStreamBuilder *bld,
int32_t bufferSize);
/**
* hdfsStreamBuilderSetReplication - Set the replication for the stream.
* This is only relevant for output streams, which will create new blocks.
*
* @param bld The hdfs stream builder.
* @param replication The replication to set.
*
* @return 0 on success, or -1 on error. Errno will be set on error.
* If you call this on an input stream builder, you will get
* EINVAL, because this configuration is not relevant to input
* streams.
*/
LIBHDFS_EXTERNAL
int hdfsStreamBuilderSetReplication(struct hdfsStreamBuilder *bld,
int16_t replication);
/**
* hdfsStreamBuilderSetDefaultBlockSize - Set the default block size for
* the stream. This is only relevant for output streams, which will create
* new blocks.
*
* @param bld The hdfs stream builder.
* @param defaultBlockSize The default block size to set.
*
* @return 0 on success, or -1 on error. Errno will be set on error.
* If you call this on an input stream builder, you will get
* EINVAL, because this configuration is not relevant to input
* streams.
*/
LIBHDFS_EXTERNAL
int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
int64_t defaultBlockSize);
/**
* hdfsStreamBuilderBuild - Build the stream by calling open or create.
*
* @param bld The hdfs stream builder. This pointer will be freed, whether
* or not the open succeeds.
*
* @return the stream pointer on success, or NULL on error. Errno will be
* set on error.
*/
LIBHDFS_EXTERNAL
hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld);
/** /**
* hdfsTruncateFile - Truncate a hdfs file to given lenght. * hdfsTruncateFile - Truncate a hdfs file to given lenght.
* @param fs The configured filesystem handle. * @param fs The configured filesystem handle.

View File

@ -956,6 +956,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9094. Add command line option to ask NameNode reload HDFS-9094. Add command line option to ask NameNode reload
configuration. (Xiaobing Zhou via Arpit Agarwal) configuration. (Xiaobing Zhou via Arpit Agarwal)
HDFS-9541. Add hdfsStreamBuilder API to libhdfs to support defaultBlockSizes
greater than 2 GB. (cmccabe via zhz)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES