HDFS-14478. Add libhdfs APIs for openFile (#4166)

Contributed by Sahil Takiar

Change-Id: I2f9e82a69010df7496704754515b031f2a907167
This commit is contained in:
Steve Loughran 2022-04-13 14:15:27 +01:00
parent 2112ef61e0
commit 52c6d77274
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
9 changed files with 807 additions and 11 deletions

View File

@ -454,6 +454,68 @@ int main(int argc, char **argv) {
hdfsCloseFile(lfs, localFile); hdfsCloseFile(lfs, localFile);
} }
{
// HDFS Open File Builder tests
exists = hdfsExists(fs, readPath);
if (exists) {
fprintf(stderr, "Failed to validate existence of %s\n", readPath);
shutdown_and_exit(cl, -1);
}
hdfsOpenFileBuilder *builder;
builder = hdfsOpenFileBuilderAlloc(fs, readPath);
hdfsOpenFileBuilderOpt(builder, "hello", "world");
hdfsOpenFileFuture *future;
future = hdfsOpenFileBuilderBuild(builder);
readFile = hdfsOpenFileFutureGet(future);
if (!hdfsOpenFileFutureCancel(future, 0)) {
fprintf(stderr, "Cancel on a completed Future should return false");
shutdown_and_exit(cl, -1);
}
hdfsOpenFileFutureFree(future);
memset(buffer, 0, sizeof(buffer));
num_read_bytes = hdfsRead(fs, readFile, (void *) buffer,
sizeof(buffer));
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
fprintf(stderr,
"Failed to read. Expected %s but got %s (%d bytes)\n",
fileContents, buffer, num_read_bytes);
shutdown_and_exit(cl, -1);
}
hdfsCloseFile(fs, readFile);
builder = hdfsOpenFileBuilderAlloc(fs, readPath);
hdfsOpenFileBuilderOpt(builder, "hello", "world");
future = hdfsOpenFileBuilderBuild(builder);
readFile = hdfsOpenFileFutureGetWithTimeout(future, 1, jDays);
if (!hdfsOpenFileFutureCancel(future, 0)) {
fprintf(stderr, "Cancel on a completed Future should return "
"false");
shutdown_and_exit(cl, -1);
}
hdfsOpenFileFutureFree(future);
memset(buffer, 0, sizeof(buffer));
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer));
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
fprintf(stderr, "Failed to read. Expected %s but got "
"%s (%d bytes)\n", fileContents, buffer,
num_read_bytes);
shutdown_and_exit(cl, -1);
}
memset(buffer, 0, strlen(fileContents + 1));
hdfsCloseFile(fs, readFile);
}
totalResult = 0; totalResult = 0;
result = 0; result = 0;
{ {

View File

@ -38,6 +38,10 @@
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path" #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
// StreamCapability flags taken from o.a.h.fs.StreamCapabilities
#define IS_READ_BYTE_BUFFER_CAPABILITY "in:readbytebuffer"
#define IS_PREAD_BYTE_BUFFER_CAPABILITY "in:preadbytebuffer"
// Bit fields for hdfsFile_internal flags // Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1) #define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
@ -1075,6 +1079,27 @@ done:
return 0; return 0;
} }
/**
* Sets the flags of the given hdfsFile based on the capabilities of the
* underlying stream.
*
* @param file file->flags will be updated based on the capabilities of jFile
* @param jFile the underlying stream to check for capabilities
*/
static void setFileFlagCapabilities(hdfsFile file, jobject jFile) {
// Check the StreamCapabilities of jFile to see if we can do direct
// reads
if (hdfsHasStreamCapability(jFile, IS_READ_BYTE_BUFFER_CAPABILITY)) {
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
}
// Check the StreamCapabilities of jFile to see if we can do direct
// preads
if (hdfsHasStreamCapability(jFile, IS_PREAD_BYTE_BUFFER_CAPABILITY)) {
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
}
}
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
int32_t bufferSize, int16_t replication, int64_t blockSize) int32_t bufferSize, int16_t replication, int64_t blockSize)
{ {
@ -1245,17 +1270,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
file->flags = 0; file->flags = 0;
if ((flags & O_WRONLY) == 0) { if ((flags & O_WRONLY) == 0) {
// Check the StreamCapabilities of jFile to see if we can do direct setFileFlagCapabilities(file, jFile);
// reads
if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
}
// Check the StreamCapabilities of jFile to see if we can do direct
// preads
if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
}
} }
ret = 0; ret = 0;
@ -1288,6 +1303,469 @@ hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld)
return file; return file;
} }
/**
* A wrapper around o.a.h.fs.FutureDataInputStreamBuilder and the file name
* associated with the builder.
*/
struct hdfsOpenFileBuilder {
jobject jBuilder;
const char *path;
};
/**
* A wrapper around a java.util.concurrent.Future (created by calling
* FutureDataInputStreamBuilder#build) and the file name associated with the
* builder.
*/
struct hdfsOpenFileFuture {
jobject jFuture;
const char *path;
};
hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
const char *path) {
int ret = 0;
jthrowable jthr;
jvalue jVal;
jobject jFS = (jobject) fs;
jobject jPath = NULL;
jobject jBuilder = NULL;
JNIEnv *env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return NULL;
}
hdfsOpenFileBuilder *builder;
builder = calloc(1, sizeof(hdfsOpenFileBuilder));
if (!builder) {
fprintf(stderr, "hdfsOpenFileBuilderAlloc(%s): OOM when creating "
"hdfsOpenFileBuilder\n", path);
errno = ENOMEM;
goto done;
}
builder->path = path;
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileBuilderAlloc(%s): constructNewObjectOfPath",
path);
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
"openFile", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FDISB)),
jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileBuilderAlloc(%s): %s#openFile(Path) failed",
HADOOP_FS, path);
goto done;
}
jBuilder = jVal.l;
builder->jBuilder = (*env)->NewGlobalRef(env, jBuilder);
if (!builder->jBuilder) {
printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsOpenFileBuilderAlloc(%s): NewGlobalRef(%s) failed", path,
HADOOP_FDISB);
ret = EINVAL;
goto done;
}
done:
destroyLocalReference(env, jPath);
destroyLocalReference(env, jBuilder);
if (ret) {
if (builder) {
if (builder->jBuilder) {
(*env)->DeleteGlobalRef(env, builder->jBuilder);
}
free(builder);
}
errno = ret;
return NULL;
}
return builder;
}
/**
* Used internally by hdfsOpenFileBuilderWithOption to switch between
* FSBuilder#must and #opt.
*/
typedef enum { must, opt } openFileBuilderOptionType;
/**
* Shared implementation of hdfsOpenFileBuilderMust and hdfsOpenFileBuilderOpt
* that switches between each method depending on the value of
* openFileBuilderOptionType.
*/
static hdfsOpenFileBuilder *hdfsOpenFileBuilderWithOption(
hdfsOpenFileBuilder *builder, const char *key,
const char *value, openFileBuilderOptionType optionType) {
int ret = 0;
jthrowable jthr;
jvalue jVal;
jobject localJBuilder = NULL;
jobject globalJBuilder;
jstring jKeyString = NULL;
jstring jValueString = NULL;
// If the builder was not previously created by a prior call to
// hdfsOpenFileBuilderAlloc then exit
if (builder == NULL || builder->jBuilder == NULL) {
errno = EINVAL;
return NULL;
}
JNIEnv *env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return NULL;
}
jthr = newJavaStr(env, key, &jKeyString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
builder->path, key);
goto done;
}
jthr = newJavaStr(env, value, &jValueString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
builder->path, value);
goto done;
}
const char *optionTypeMethodName;
switch (optionType) {
case must:
optionTypeMethodName = "must";
break;
case opt:
optionTypeMethodName = "opt";
break;
default:
ret = EINTERNAL;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
JC_FUTURE_DATA_IS_BUILDER, optionTypeMethodName,
JMETHOD2(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING),
JPARAM(HADOOP_FS_BLDR)), jKeyString,
jValueString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileBuilderWithOption(%s): %s#%s(%s, %s) failed",
builder->path, HADOOP_FS_BLDR, optionTypeMethodName, key,
value);
goto done;
}
localJBuilder = jVal.l;
globalJBuilder = (*env)->NewGlobalRef(env, localJBuilder);
if (!globalJBuilder) {
printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsOpenFileBuilderWithOption(%s): NewGlobalRef(%s) failed",
builder->path, HADOOP_FDISB);
ret = EINVAL;
goto done;
}
(*env)->DeleteGlobalRef(env, builder->jBuilder);
builder->jBuilder = globalJBuilder;
done:
destroyLocalReference(env, jKeyString);
destroyLocalReference(env, jValueString);
destroyLocalReference(env, localJBuilder);
if (ret) {
errno = ret;
return NULL;
}
return builder;
}
hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
const char *key, const char *value) {
openFileBuilderOptionType optionType;
optionType = must;
return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
}
hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
const char *key, const char *value) {
openFileBuilderOptionType optionType;
optionType = opt;
return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
}
hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder) {
int ret = 0;
jthrowable jthr;
jvalue jVal;
jobject jFuture = NULL;
// If the builder was not previously created by a prior call to
// hdfsOpenFileBuilderAlloc then exit
if (builder == NULL || builder->jBuilder == NULL) {
ret = EINVAL;
return NULL;
}
JNIEnv *env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return NULL;
}
hdfsOpenFileFuture *future;
future = calloc(1, sizeof(hdfsOpenFileFuture));
if (!future) {
fprintf(stderr, "hdfsOpenFileBuilderBuild: OOM when creating "
"hdfsOpenFileFuture\n");
errno = ENOMEM;
goto done;
}
future->path = builder->path;
jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
JC_FUTURE_DATA_IS_BUILDER, "build",
JMETHOD1("", JPARAM(JAVA_CFUTURE)));
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileBuilderBuild(%s): %s#build() failed",
builder->path, HADOOP_FDISB);
goto done;
}
jFuture = jVal.l;
future->jFuture = (*env)->NewGlobalRef(env, jFuture);
if (!future->jFuture) {
printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsOpenFileBuilderBuild(%s): NewGlobalRef(%s) failed",
builder->path, JAVA_CFUTURE);
ret = EINVAL;
goto done;
}
done:
destroyLocalReference(env, jFuture);
if (ret) {
if (future) {
if (future->jFuture) {
(*env)->DeleteGlobalRef(env, future->jFuture);
}
free(future);
}
hdfsOpenFileBuilderFree(builder);
errno = ret;
return NULL;
}
hdfsOpenFileBuilderFree(builder);
return future;
}
void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
JNIEnv *env;
env = getJNIEnv();
if (!env) {
return;
}
if (builder->jBuilder) {
(*env)->DeleteGlobalRef(env, builder->jBuilder);
builder->jBuilder = NULL;
}
free(builder);
}
/**
* Shared implementation of hdfsOpenFileFutureGet and
* hdfsOpenFileFutureGetWithTimeout. If a timeout is specified, calls
* Future#get() otherwise it calls Future#get(long, TimeUnit).
*/
static hdfsFile fileFutureGetWithTimeout(hdfsOpenFileFuture *future,
int64_t timeout, jobject jTimeUnit) {
int ret = 0;
jthrowable jthr;
jvalue jVal;
hdfsFile file = NULL;
jobject jFile = NULL;
JNIEnv *env = getJNIEnv();
if (!env) {
ret = EINTERNAL;
return NULL;
}
if (!jTimeUnit) {
jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
JC_CFUTURE, "get", JMETHOD1("", JPARAM(JAVA_OBJECT)));
} else {
jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
JC_CFUTURE, "get", JMETHOD2("J",
JPARAM(JAVA_TIMEUNIT), JPARAM(JAVA_OBJECT)), timeout,
jTimeUnit);
}
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
JAVA_CFUTURE);
goto done;
}
file = calloc(1, sizeof(struct hdfsFile_internal));
if (!file) {
fprintf(stderr, "hdfsOpenFileFutureGet(%s): OOM when creating "
"hdfsFile\n", future->path);
ret = ENOMEM;
goto done;
}
jFile = jVal.l;
file->file = (*env)->NewGlobalRef(env, jFile);
if (!file->file) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsOpenFileFutureGet(%s): NewGlobalRef(jFile) failed",
future->path);
goto done;
}
file->type = HDFS_STREAM_INPUT;
file->flags = 0;
setFileFlagCapabilities(file, jFile);
done:
destroyLocalReference(env, jTimeUnit);
destroyLocalReference(env, jFile);
if (ret) {
if (file) {
if (file->file) {
(*env)->DeleteGlobalRef(env, file->file);
}
free(file);
}
errno = ret;
return NULL;
}
return file;
}
hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
return fileFutureGetWithTimeout(future, -1, NULL);
}
hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
int64_t timeout, javaConcurrentTimeUnit timeUnit) {
int ret = 0;
jthrowable jthr;
jobject jTimeUnit = NULL;
JNIEnv *env = getJNIEnv();
if (!env) {
ret = EINTERNAL;
return NULL;
}
const char *timeUnitEnumName;
switch (timeUnit) {
case jNanoseconds:
timeUnitEnumName = "NANOSECONDS";
break;
case jMicroseconds:
timeUnitEnumName = "MICROSECONDS";
break;
case jMilliseconds:
timeUnitEnumName = "MILLISECONDS";
break;
case jSeconds:
timeUnitEnumName = "SECONDS";
break;
case jMinutes:
timeUnitEnumName = "MINUTES";
break;
case jHours:
timeUnitEnumName = "HOURS";
break;
case jDays:
timeUnitEnumName = "DAYS";
break;
default:
ret = EINTERNAL;
goto done;
}
jthr = fetchEnumInstance(env, JAVA_TIMEUNIT, timeUnitEnumName, &jTimeUnit);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
JAVA_CFUTURE);
goto done;
}
return fileFutureGetWithTimeout(future, timeout, jTimeUnit);
done:
if (ret) {
errno = ret;
}
return NULL;
}
int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
int mayInterruptIfRunning) {
int ret = 0;
jthrowable jthr;
jvalue jVal;
jboolean jMayInterruptIfRunning;
JNIEnv *env = getJNIEnv();
if (!env) {
ret = EINTERNAL;
return -1;
}
jMayInterruptIfRunning = mayInterruptIfRunning ? JNI_TRUE : JNI_FALSE;
jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture, JC_CFUTURE,
"cancel", JMETHOD1("Z", "Z"), jMayInterruptIfRunning);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFileFutureCancel(%s): %s#cancel failed", future->path,
JAVA_CFUTURE);
goto done;
}
done:
if (ret) {
errno = ret;
return -1;
}
if (!jVal.z) {
return -1;
}
return 0;
}
void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
JNIEnv *env;
env = getJNIEnv();
if (!env) {
return;
}
if (future->jFuture) {
(*env)->DeleteGlobalRef(env, future->jFuture);
future->jFuture = NULL;
}
free(future);
}
int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
{ {
jobject jFS = (jobject)fs; jobject jFS = (jobject)fs;

View File

@ -82,6 +82,29 @@ extern "C" {
} tObjectKind; } tObjectKind;
struct hdfsStreamBuilder; struct hdfsStreamBuilder;
/**
* The C reflection of the enum values from java.util.concurrent.TimeUnit .
*/
typedef enum javaConcurrentTimeUnit {
jNanoseconds,
jMicroseconds,
jMilliseconds,
jSeconds,
jMinutes,
jHours,
jDays,
} javaConcurrentTimeUnit;
/**
* The C reflection of java.util.concurrent.Future specifically used for
* opening HDFS files asynchronously.
*/
typedef struct hdfsOpenFileFuture hdfsOpenFileFuture;
/**
* The C reflection of o.a.h.fs.FutureDataInputStreamBuilder .
*/
typedef struct hdfsOpenFileBuilder hdfsOpenFileBuilder;
/** /**
* The C reflection of org.apache.org.hadoop.FileSystem . * The C reflection of org.apache.org.hadoop.FileSystem .
@ -429,6 +452,118 @@ extern "C" {
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize); int bufferSize, short replication, tSize blocksize);
/**
* hdfsOpenFileBuilderAlloc - Allocate a HDFS open file builder.
*
* @param fs The configured filesystem handle.
* @param path The full path to the file.
* @return Returns the hdfsOpenFileBuilder, or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
const char *path);
/**
* hdfsOpenFileBuilderMust - Specifies a mandatory parameter for the open
* file builder. While the underlying FsBuilder supports various various
* types for the value (boolean, int, float, double), currently only
* strings are supported.
*
* @param builder The open file builder to set the config for.
* @param key The config key
* @param value The config value
* @return Returns the hdfsOpenFileBuilder, or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
const char *key, const char *value);
/**
* hdfsOpenFileBuilderOpt - Specifies an optional parameter for the open
* file builder. While the underlying FsBuilder supports various various
* types for the value (boolean, int, float, double), currently only
* strings are supported.
*
* @param builder The open file builder to set the config for.
* @param key The config key
* @param value The config value
* @return Returns the hdfsOpenFileBuilder, or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
const char *key, const char *value);
/**
* hdfsOpenFileBuilderBuild - Builds the open file builder and returns a
* hdfsOpenFileFuture which tracks the asynchronous call to open the
* specified file.
*
* @param builder The open file builder to build.
* @return Returns the hdfsOpenFileFuture, or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder);
/**
* hdfsOpenFileBuilderFree - Free a HDFS open file builder.
*
* It is normally not necessary to call this function since
* hdfsOpenFileBuilderBuild frees the builder.
*
* @param builder The hdfsOpenFileBuilder to free.
*/
LIBHDFS_EXTERNAL
void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder);
/**
* hdfsOpenFileFutureGet - Call Future#get() on the underlying Java Future
* object. A call to #get() will block until the asynchronous operation has
* completed. In this case, until the open file call has completed. This
* method blocks indefinitely until blocking call completes.
*
* @param future The hdfsOpenFileFuture to call #get on
* @return Returns the opened hdfsFile, or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future);
/**
* hdfsOpenFileFutureGetWithTimeout - Call Future#get(long, TimeUnit) on
* the underlying Java Future object. A call to #get(long, TimeUnit) will
* block until the asynchronous operation has completed (in this case,
* until the open file call has completed) or the specified timeout has
* been reached.
*
* @param future The hdfsOpenFileFuture to call #get on
* @return Returns the opened hdfsFile, or NULL on error or if the timeout
* has been reached.
*/
LIBHDFS_EXTERNAL
hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
int64_t timeout, javaConcurrentTimeUnit timeUnit);
/**
* hdfsOpenFileFutureCancel - Call Future#cancel(boolean) on the
* underlying Java Future object. The value of mayInterruptedIfRunning
* controls whether the Java thread running the Future should be
* interrupted or not.
*
* @param future The hdfsOpenFileFuture to call #cancel on
* @param mayInterruptIfRunning if true, interrupts the running thread
* @return Returns 0 if the thread was successfully cancelled, else -1
*/
LIBHDFS_EXTERNAL
int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
int mayInterruptIfRunning);
/**
* hdfsOpenFileFutureFree - Free a HDFS open file future.
*
* @param hdfsOpenFileFuture The hdfsOpenFileFuture to free.
*/
LIBHDFS_EXTERNAL
void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future);
/** /**
* hdfsStreamBuilderAlloc - Allocate an HDFS stream builder. * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder.
* *

View File

@ -98,6 +98,8 @@ jthrowable initCachedClasses(JNIEnv* env) {
"org/apache/hadoop/hdfs/ReadStatistics"; "org/apache/hadoop/hdfs/ReadStatistics";
cachedJavaClasses[JC_HDFS_DATA_INPUT_STREAM].className = cachedJavaClasses[JC_HDFS_DATA_INPUT_STREAM].className =
"org/apache/hadoop/hdfs/client/HdfsDataInputStream"; "org/apache/hadoop/hdfs/client/HdfsDataInputStream";
cachedJavaClasses[JC_FUTURE_DATA_IS_BUILDER].className =
"org/apache/hadoop/fs/FutureDataInputStreamBuilder";
cachedJavaClasses[JC_DOMAIN_SOCKET].className = cachedJavaClasses[JC_DOMAIN_SOCKET].className =
"org/apache/hadoop/net/unix/DomainSocket"; "org/apache/hadoop/net/unix/DomainSocket";
cachedJavaClasses[JC_URI].className = cachedJavaClasses[JC_URI].className =
@ -108,6 +110,8 @@ jthrowable initCachedClasses(JNIEnv* env) {
"java/util/EnumSet"; "java/util/EnumSet";
cachedJavaClasses[JC_EXCEPTION_UTILS].className = cachedJavaClasses[JC_EXCEPTION_UTILS].className =
"org/apache/commons/lang3/exception/ExceptionUtils"; "org/apache/commons/lang3/exception/ExceptionUtils";
cachedJavaClasses[JC_CFUTURE].className =
"java/util/concurrent/CompletableFuture";
// Create and set the jclass objects based on the class names set above // Create and set the jclass objects based on the class names set above
jthrowable jthr; jthrowable jthr;

View File

@ -54,11 +54,13 @@ typedef enum {
JC_FS_PERMISSION, JC_FS_PERMISSION,
JC_READ_STATISTICS, JC_READ_STATISTICS,
JC_HDFS_DATA_INPUT_STREAM, JC_HDFS_DATA_INPUT_STREAM,
JC_FUTURE_DATA_IS_BUILDER,
JC_DOMAIN_SOCKET, JC_DOMAIN_SOCKET,
JC_URI, JC_URI,
JC_BYTE_BUFFER, JC_BYTE_BUFFER,
JC_ENUM_SET, JC_ENUM_SET,
JC_EXCEPTION_UTILS, JC_EXCEPTION_UTILS,
JC_CFUTURE,
// A special marker enum that counts the number of cached jclasses // A special marker enum that counts the number of cached jclasses
NUM_CACHED_CLASSES NUM_CACHED_CLASSES
} CachedJavaClass; } CachedJavaClass;
@ -95,6 +97,8 @@ const char *getClassName(CachedJavaClass cachedJavaClass);
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission" #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
#define HADOOP_RSTAT "org/apache/hadoop/hdfs/ReadStatistics" #define HADOOP_RSTAT "org/apache/hadoop/hdfs/ReadStatistics"
#define HADOOP_HDISTRM "org/apache/hadoop/hdfs/client/HdfsDataInputStream" #define HADOOP_HDISTRM "org/apache/hadoop/hdfs/client/HdfsDataInputStream"
#define HADOOP_FDISB "org/apache/hadoop/fs/FutureDataInputStreamBuilder"
#define HADOOP_FS_BLDR "org/apache/hadoop/fs/FSBuilder"
#define HADOOP_RO "org/apache/hadoop/fs/ReadOption" #define HADOOP_RO "org/apache/hadoop/fs/ReadOption"
#define HADOOP_DS "org/apache/hadoop/net/unix/DomainSocket" #define HADOOP_DS "org/apache/hadoop/net/unix/DomainSocket"
@ -104,6 +108,9 @@ const char *getClassName(CachedJavaClass cachedJavaClass);
#define JAVA_BYTEBUFFER "java/nio/ByteBuffer" #define JAVA_BYTEBUFFER "java/nio/ByteBuffer"
#define JAVA_STRING "java/lang/String" #define JAVA_STRING "java/lang/String"
#define JAVA_ENUMSET "java/util/EnumSet" #define JAVA_ENUMSET "java/util/EnumSet"
#define JAVA_CFUTURE "java/util/concurrent/CompletableFuture"
#define JAVA_TIMEUNIT "java/util/concurrent/TimeUnit"
#define JAVA_OBJECT "java/lang/Object"
/* Some frequently used third-party class names */ /* Some frequently used third-party class names */

View File

@ -250,6 +250,65 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
return ret; return ret;
} }
hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
const char *path) {
return libhdfs_hdfsOpenFileBuilderAlloc(fs->libhdfsRep, path);
}
hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(
hdfsOpenFileBuilder *builder, const char *key,
const char *value) {
return libhdfs_hdfsOpenFileBuilderMust(builder, key, value);
}
hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(
hdfsOpenFileBuilder *builder, const char *key,
const char *value) {
return libhdfs_hdfsOpenFileBuilderOpt(builder, key, value);
}
hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(
hdfsOpenFileBuilder *builder) {
return libhdfs_hdfsOpenFileBuilderBuild(builder);
}
void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
libhdfs_hdfsOpenFileBuilderFree(builder);
}
hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
hdfsFile ret = calloc(1, sizeof(struct hdfsFile_internal));
ret->libhdfsppRep = 0;
ret->libhdfsRep = libhdfs_hdfsOpenFileFutureGet(future);
if (!ret->libhdfsRep) {
free(ret);
ret = NULL;
}
return ret;
}
hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
int64_t timeout, javaConcurrentTimeUnit timeUnit) {
hdfsFile ret = calloc(1, sizeof(struct hdfsFile_internal));
ret->libhdfsppRep = 0;
ret->libhdfsRep = libhdfs_hdfsOpenFileFutureGetWithTimeout(future, timeout,
timeUnit);
if (!ret->libhdfsRep) {
free(ret);
ret = NULL;
}
return ret;
}
int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
int mayInterruptIfRunning) {
return libhdfs_hdfsOpenFileFutureCancel(future, mayInterruptIfRunning);
}
void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
libhdfs_hdfsOpenFileFutureFree(future);
}
int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) { int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) {
return libhdfs_hdfsTruncateFile(fs->libhdfsRep, path, newlength); return libhdfs_hdfsTruncateFile(fs->libhdfsRep, path, newlength);
} }

View File

@ -39,6 +39,23 @@
#define hdfsConfStrFree libhdfs_hdfsConfStrFree #define hdfsConfStrFree libhdfs_hdfsConfStrFree
#define hdfsDisconnect libhdfs_hdfsDisconnect #define hdfsDisconnect libhdfs_hdfsDisconnect
#define hdfsOpenFile libhdfs_hdfsOpenFile #define hdfsOpenFile libhdfs_hdfsOpenFile
#define hdfsOpenFileBuilderAlloc libhdfs_hdfsOpenFileBuilderAlloc
#define hdfsOpenFileBuilderMust libhdfs_hdfsOpenFileBuilderMust
#define hdfsOpenFileBuilderOpt libhdfs_hdfsOpenFileBuilderOpt
#define hdfsOpenFileBuilderBuild libhdfs_hdfsOpenFileBuilderBuild
#define hdfsOpenFileBuilderFree libhdfs_hdfsOpenFileBuilderFree
#define hdfsOpenFileFutureGet libhdfs_hdfsOpenFileFutureGet
#define javaConcurrentTimeUnit libhdfs_javaConcurrentTimeUnit
#define jNanoseconds libhdfs_jNanoseconds
#define jMicroseconds libhdfs_jMicroseconds
#define jMilliseconds libhdfs_jMilliseconds
#define jSeconds libhdfsj_jSeconds
#define jMinutes libhdfs_jMinutes
#define jHours libhdfs_jHours
#define jDays libhdfs_jDays
#define hdfsOpenFileFutureGetWithTimeout libhdfs_hdfsOpenFileFutureGetWithTimeout
#define hdfsOpenFileFutureCancel libhdfs_hdfsOpenFileFutureCancel
#define hdfsOpenFileFutureFree libhdfs_hdfsOpenFileFutureFree
#define hdfsTruncateFile libhdfs_hdfsTruncateFile #define hdfsTruncateFile libhdfs_hdfsTruncateFile
#define hdfsUnbufferFile libhdfs_hdfsUnbufferFile #define hdfsUnbufferFile libhdfs_hdfsUnbufferFile
#define hdfsCloseFile libhdfs_hdfsCloseFile #define hdfsCloseFile libhdfs_hdfsCloseFile

View File

@ -39,6 +39,23 @@
#undef hdfsConfStrFree #undef hdfsConfStrFree
#undef hdfsDisconnect #undef hdfsDisconnect
#undef hdfsOpenFile #undef hdfsOpenFile
#undef hdfsOpenFileBuilderAlloc
#undef hdfsOpenFileBuilderMust
#undef hdfsOpenFileBuilderOpt
#undef hdfsOpenFileBuilderBuild
#undef hdfsOpenFileBuilderFree
#undef hdfsOpenFileFutureGet
#undef javaConcurrentTimeUnit
#undef jNanoseconds
#undef jMicroseconds
#undef jMilliseconds
#undef jSeconds
#undef jMinutes
#undef jHours
#undef jDays
#undef hdfsOpenFileFutureGetWithTimeout
#undef hdfsOpenFileFutureCancel
#undef hdfsOpenFileFutureFree
#undef hdfsTruncateFile #undef hdfsTruncateFile
#undef hdfsUnbufferFile #undef hdfsUnbufferFile
#undef hdfsCloseFile #undef hdfsCloseFile

View File

@ -39,6 +39,23 @@
#define hdfsConfStrFree libhdfspp_hdfsConfStrFree #define hdfsConfStrFree libhdfspp_hdfsConfStrFree
#define hdfsDisconnect libhdfspp_hdfsDisconnect #define hdfsDisconnect libhdfspp_hdfsDisconnect
#define hdfsOpenFile libhdfspp_hdfsOpenFile #define hdfsOpenFile libhdfspp_hdfsOpenFile
#define hdfsOpenFileBuilderAlloc libhdfspp_hdfsOpenFileBuilderAlloc
#define hdfsOpenFileBuilderMust libhdfspp_hdfsOpenFileBuilderMust
#define hdfsOpenFileBuilderOpt libhdfspp_hdfsOpenFileBuilderOpt
#define hdfsOpenFileBuilderBuild libhdfspp_hdfsOpenFileBuilderBuild
#define hdfsOpenFileBuilderFree libhdfspp_hdfsOpenFileBuilderFree
#define hdfsOpenFileFutureGet libhdfspp_hdfsOpenFileFutureGet
#define javaConcurrentTimeUnit libhdfspp_javaConcurrentTimeUnit
#define jNanoseconds libhdfspp_jNanoseconds
#define jMicroseconds libhdfspp_jMicroseconds
#define jMilliseconds libhdfspp_jMilliseconds
#define jSeconds libhdfspp_jSeconds
#define jMinutes libhdfspp_jMinutes
#define jHours libhdfspp_jHours
#define jDays libhdfspp_jDays
#define hdfsOpenFileFutureGetWithTimeout libhdfspp_hdfsOpenFileFutureGetWithTimeout
#define hdfsOpenFileFutureCancel libhdfspp_hdfsOpenFileFutureCancel
#define hdfsOpenFileFutureFree libhdfspp_hdfsOpenFileFutureFree
#define hdfsTruncateFile libhdfspp_hdfsTruncateFile #define hdfsTruncateFile libhdfspp_hdfsTruncateFile
#define hdfsUnbufferFile libhdfspp_hdfsUnbufferFile #define hdfsUnbufferFile libhdfspp_hdfsUnbufferFile
#define hdfsCloseFile libhdfspp_hdfsCloseFile #define hdfsCloseFile libhdfspp_hdfsCloseFile