HDFS-14015. Improve error handling in hdfsThreadDestructor in native thread local storage

Change-Id: Ida1e888c9231b9e46081338e3a206d8f6faabd36
This commit is contained in:
Daniel Templeton 2018-11-16 16:24:10 -08:00
parent 29374999b6
commit e56d9f2618
2 changed files with 123 additions and 5 deletions

View File

@ -23,12 +23,17 @@
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
#define UNKNOWN "UNKNOWN"
#define MAXTHRID 256
/** Key that allows us to retrieve thread-local storage */ /** Key that allows us to retrieve thread-local storage */
static pthread_key_t gTlsKey; static pthread_key_t gTlsKey;
/** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */ /** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */
static int gTlsKeyInitialized = 0; static int gTlsKeyInitialized = 0;
static void get_current_thread_id(JNIEnv* env, char* id, int max);
/** /**
* The function that is called whenever a thread with libhdfs thread local data * The function that is called whenever a thread with libhdfs thread local data
* is destroyed. * is destroyed.
@ -41,16 +46,26 @@ void hdfsThreadDestructor(void *v)
struct ThreadLocalState *state = (struct ThreadLocalState*)v; struct ThreadLocalState *state = (struct ThreadLocalState*)v;
JNIEnv *env = state->env;; JNIEnv *env = state->env;;
jint ret; jint ret;
char thr_name[MAXTHRID];
/* Detach the current thread from the JVM */ /* Detach the current thread from the JVM */
if (env) { if (env) {
ret = (*env)->GetJavaVM(env, &vm); ret = (*env)->GetJavaVM(env, &vm);
if (ret) {
if (ret != 0) {
fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with error %d\n", fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with error %d\n",
ret); ret);
(*env)->ExceptionDescribe(env); (*env)->ExceptionDescribe(env);
} else { } else {
(*vm)->DetachCurrentThread(vm); ret = (*vm)->DetachCurrentThread(vm);
if (ret != JNI_OK) {
(*env)->ExceptionDescribe(env);
get_current_thread_id(env, thr_name, MAXTHRID);
fprintf(stderr, "hdfsThreadDestructor: Unable to detach thread %s "
"from the JVM. Error code: %d\n", thr_name, ret);
}
} }
} }
@ -62,13 +77,57 @@ void hdfsThreadDestructor(void *v)
free(state); free(state);
} }
static void get_current_thread_id(JNIEnv* env, char* id, int max) {
jclass cls;
jmethodID mth;
jobject thr;
jstring thr_name;
jlong thr_id = 0;
const char *thr_name_str;
cls = (*env)->FindClass(env, "java/lang/Thread");
mth = (*env)->GetStaticMethodID(env, cls, "currentThread",
"()Ljava/lang/Thread;");
thr = (*env)->CallStaticObjectMethod(env, cls, mth);
if (thr != NULL) {
mth = (*env)->GetMethodID(env, cls, "getId", "()J");
thr_id = (*env)->CallLongMethod(env, thr, mth);
(*env)->ExceptionDescribe(env);
mth = (*env)->GetMethodID(env, cls, "toString", "()Ljava/lang/String;");
thr_name = (jstring)(*env)->CallObjectMethod(env, thr, mth);
if (thr_name != NULL) {
thr_name_str = (*env)->GetStringUTFChars(env, thr_name, NULL);
// Treating the jlong as a long *should* be safe
snprintf(id, max, "%s:%ld", thr_name_str, thr_id);
// Release the char*
(*env)->ReleaseStringUTFChars(env, thr_name, thr_name_str);
} else {
(*env)->ExceptionDescribe(env);
// Treating the jlong as a long *should* be safe
snprintf(id, max, "%s:%ld", UNKNOWN, thr_id);
}
} else {
(*env)->ExceptionDescribe(env);
snprintf(id, max, "%s", UNKNOWN);
}
// Make sure the id is null terminated in case we overflow the max length
id[max - 1] = '\0';
}
struct ThreadLocalState* threadLocalStorageCreate() struct ThreadLocalState* threadLocalStorageCreate()
{ {
struct ThreadLocalState *state; struct ThreadLocalState *state;
state = (struct ThreadLocalState*)malloc(sizeof(struct ThreadLocalState)); state = (struct ThreadLocalState*)malloc(sizeof(struct ThreadLocalState));
if (state == NULL) { if (state == NULL) {
fprintf(stderr, fprintf(stderr,
"threadLocalStorageSet: OOM - Unable to allocate thread local state\n"); "threadLocalStorageCreate: OOM - Unable to allocate thread local state\n");
return NULL; return NULL;
} }
state->lastExceptionStackTrace = NULL; state->lastExceptionStackTrace = NULL;

View File

@ -23,9 +23,14 @@
#include <stdio.h> #include <stdio.h>
#include <windows.h> #include <windows.h>
#define UNKNOWN "UNKNOWN"
#define MAXTHRID 256
/** Key that allows us to retrieve thread-local storage */ /** Key that allows us to retrieve thread-local storage */
static DWORD gTlsIndex = TLS_OUT_OF_INDEXES; static DWORD gTlsIndex = TLS_OUT_OF_INDEXES;
static void get_current_thread_id(JNIEnv* env, char* id, int max);
/** /**
* If the current thread has a JNIEnv in thread-local storage, then detaches the * If the current thread has a JNIEnv in thread-local storage, then detaches the
* current thread from the JVM and also frees up the ThreadLocalState object. * current thread from the JVM and also frees up the ThreadLocalState object.
@ -36,6 +41,8 @@ static void detachCurrentThreadFromJvm()
JNIEnv *env = NULL; JNIEnv *env = NULL;
JavaVM *vm; JavaVM *vm;
jint ret; jint ret;
char thr_name[MAXTHRID];
if (threadLocalStorageGet(&state) || !state) { if (threadLocalStorageGet(&state) || !state) {
return; return;
} }
@ -50,7 +57,15 @@ static void detachCurrentThreadFromJvm()
ret); ret);
(*env)->ExceptionDescribe(env); (*env)->ExceptionDescribe(env);
} else { } else {
(*vm)->DetachCurrentThread(vm); ret = (*vm)->DetachCurrentThread(vm);
if (ret != JNI_OK) {
(*env)->ExceptionDescribe(env);
get_current_thread_id(env, thr_name, MAXTHRID);
fprintf(stderr, "detachCurrentThreadFromJvm: Unable to detach thread %s "
"from the JVM. Error code: %d\n", thr_name, ret);
}
} }
/* Free exception strings */ /* Free exception strings */
@ -61,6 +76,50 @@ static void detachCurrentThreadFromJvm()
free(state); free(state);
} }
static void get_current_thread_id(JNIEnv* env, char* id, int max) {
jclass cls;
jmethodID mth;
jobject thr;
jstring thr_name;
jlong thr_id = 0;
const char *thr_name_str;
cls = (*env)->FindClass(env, "java/lang/Thread");
mth = (*env)->GetStaticMethodID(env, cls, "currentThread",
"()Ljava/lang/Thread;");
thr = (*env)->CallStaticObjectMethod(env, cls, mth);
if (thr != NULL) {
mth = (*env)->GetMethodID(env, cls, "getId", "()J");
thr_id = (*env)->CallLongMethod(env, thr, mth);
(*env)->ExceptionDescribe(env);
mth = (*env)->GetMethodID(env, cls, "toString", "()Ljava/lang/String;");
thr_name = (jstring)(*env)->CallObjectMethod(env, thr, mth);
if (thr_name != NULL) {
thr_name_str = (*env)->GetStringUTFChars(env, thr_name, NULL);
// Treating the jlong as a long *should* be safe
snprintf(id, max, "%s:%ld", thr_name_str, thr_id);
// Release the char*
(*env)->ReleaseStringUTFChars(env, thr_name, thr_name_str);
} else {
(*env)->ExceptionDescribe(env);
// Treating the jlong as a long *should* be safe
snprintf(id, max, "%s:%ld", UNKNOWN, thr_id);
}
} else {
(*env)->ExceptionDescribe(env);
snprintf(id, max, "%s", UNKNOWN);
}
// Make sure the id is null terminated in case we overflow the max length
id[max - 1] = '\0';
}
void hdfsThreadDestructor(void *v) void hdfsThreadDestructor(void *v)
{ {
// Ignore 'v' since it will contain the state and we will obtain it in the below // Ignore 'v' since it will contain the state and we will obtain it in the below
@ -148,7 +207,7 @@ struct ThreadLocalState* threadLocalStorageCreate()
state = (struct ThreadLocalState*)malloc(sizeof(struct ThreadLocalState)); state = (struct ThreadLocalState*)malloc(sizeof(struct ThreadLocalState));
if (state == NULL) { if (state == NULL) {
fprintf(stderr, fprintf(stderr,
"threadLocalStorageSet: OOM - Unable to allocate thread local state\n"); "threadLocalStorageCreate: OOM - Unable to allocate thread local state\n");
return NULL; return NULL;
} }
state->lastExceptionStackTrace = NULL; state->lastExceptionStackTrace = NULL;