HADOOP-14451. Deadlock in NativeIO (#6632)

This commit is contained in:
Vinayakumar B 2024-03-18 10:53:21 +05:30 committed by GitHub
parent b25b28e5bb
commit 0f51d2a4ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 191 additions and 48 deletions

View File

@ -220,6 +220,9 @@ public long getLength() {
} }
} }
/** Initialize the JNI method ID and class ID cache. */
private static native void initNativePosix(boolean doThreadsafeWorkaround);
/** /**
* JNI wrapper of persist memory operations. * JNI wrapper of persist memory operations.
*/ */
@ -331,11 +334,11 @@ public boolean verifyCanMlock() {
if (NativeCodeLoader.isNativeCodeLoaded()) { if (NativeCodeLoader.isNativeCodeLoaded()) {
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
workaroundNonThreadSafePasswdCalls = conf.getBoolean( boolean workaroundNonThreadSafePasswdCalls = conf.getBoolean(
WORKAROUND_NON_THREADSAFE_CALLS_KEY, WORKAROUND_NON_THREADSAFE_CALLS_KEY,
WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT); WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
initNative(); initNativePosix(workaroundNonThreadSafePasswdCalls);
nativeLoaded = true; nativeLoaded = true;
cacheTimeout = conf.getLong( cacheTimeout = conf.getLong(
@ -679,9 +682,6 @@ public static native void munmap(long addr, long length)
throws IOException; throws IOException;
} }
private static boolean workaroundNonThreadSafePasswdCalls = false;
public static class Windows { public static class Windows {
// Flags for CreateFile() call on Windows // Flags for CreateFile() call on Windows
public static final long GENERIC_READ = 0x80000000L; public static final long GENERIC_READ = 0x80000000L;
@ -833,7 +833,9 @@ public static boolean access(String path, AccessRight desiredAccess)
static { static {
if (NativeCodeLoader.isNativeCodeLoaded()) { if (NativeCodeLoader.isNativeCodeLoaded()) {
try { try {
initNative(); initNativeWindows(false);
// As of now there is no change between initNative()
// and initNativeWindows() native impls.
nativeLoaded = true; nativeLoaded = true;
} catch (Throwable t) { } catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so // This can happen if the user has an older version of libhadoop.so
@ -843,6 +845,10 @@ public static boolean access(String path, AccessRight desiredAccess)
} }
} }
} }
/** Initialize the JNI method ID and class ID cache. */
private static native void initNativeWindows(
boolean doThreadsafeWorkaround);
} }
private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class); private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
@ -852,7 +858,7 @@ public static boolean access(String path, AccessRight desiredAccess)
static { static {
if (NativeCodeLoader.isNativeCodeLoaded()) { if (NativeCodeLoader.isNativeCodeLoaded()) {
try { try {
initNative(); initNative(false);
nativeLoaded = true; nativeLoaded = true;
} catch (Throwable t) { } catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so // This can happen if the user has an older version of libhadoop.so
@ -871,7 +877,7 @@ public static boolean isAvailable() {
} }
/** Initialize the JNI method ID and class ID cache */ /** Initialize the JNI method ID and class ID cache */
private static native void initNative(); private static native void initNative(boolean doThreadsafeWorkaround);
/** /**
* Get the maximum number of bytes that can be locked into memory at any * Get the maximum number of bytes that can be locked into memory at any

View File

@ -103,24 +103,6 @@ extern void throw_ioe(JNIEnv* env, int errnum);
static ssize_t get_pw_buflen(); static ssize_t get_pw_buflen();
#endif #endif
/**
* Returns non-zero if the user has specified that the system
* has non-threadsafe implementations of getpwuid_r or getgrgid_r.
**/
static int workaround_non_threadsafe_calls(JNIEnv *env, jclass clazz) {
jboolean result;
jfieldID needs_workaround_field = (*env)->GetStaticFieldID(
env, clazz,
"workaroundNonThreadSafePasswdCalls",
"Z");
PASS_EXCEPTIONS_RET(env, 0);
assert(needs_workaround_field);
result = (*env)->GetStaticBooleanField(
env, clazz, needs_workaround_field);
return result;
}
/** /**
* Sets a static boolean field to the specified value. * Sets a static boolean field to the specified value.
*/ */
@ -201,10 +183,9 @@ static void consts_init(JNIEnv *env) {
} }
#endif #endif
static void stat_init(JNIEnv *env, jclass nativeio_class) { static void stat_init(JNIEnv *env) {
jclass clazz = NULL; jclass clazz = NULL;
jclass obj_class = NULL; if (stat_ctor2 != NULL) return; //Already inited
jmethodID obj_ctor = NULL;
// Init Stat // Init Stat
clazz = (*env)->FindClass(env, NATIVE_IO_STAT_CLASS); clazz = (*env)->FindClass(env, NATIVE_IO_STAT_CLASS);
if (!clazz) { if (!clazz) {
@ -224,6 +205,20 @@ static void stat_init(JNIEnv *env, jclass nativeio_class) {
if (!stat_ctor2) { if (!stat_ctor2) {
return; // exception has been raised return; // exception has been raised
} }
}
static void stat_deinit(JNIEnv *env) {
if (stat_clazz != NULL) {
(*env)->DeleteGlobalRef(env, stat_clazz);
stat_clazz = NULL;
}
}
static void workaround_non_threadsafe_calls_init(JNIEnv *env){
jclass obj_class = NULL;
jmethodID obj_ctor = NULL;
if (pw_lock_object != NULL) return; // Already inited
obj_class = (*env)->FindClass(env, "java/lang/Object"); obj_class = (*env)->FindClass(env, "java/lang/Object");
if (!obj_class) { if (!obj_class) {
return; // exception has been raised return; // exception has been raised
@ -233,21 +228,13 @@ static void stat_init(JNIEnv *env, jclass nativeio_class) {
if (!obj_ctor) { if (!obj_ctor) {
return; // exception has been raised return; // exception has been raised
} }
pw_lock_object = (*env)->NewObject(env, obj_class, obj_ctor);
if (workaround_non_threadsafe_calls(env, nativeio_class)) { PASS_EXCEPTIONS(env);
pw_lock_object = (*env)->NewObject(env, obj_class, obj_ctor); pw_lock_object = (*env)->NewGlobalRef(env, pw_lock_object);
PASS_EXCEPTIONS(env); PASS_EXCEPTIONS(env);
pw_lock_object = (*env)->NewGlobalRef(env, pw_lock_object);
PASS_EXCEPTIONS(env);
}
} }
static void stat_deinit(JNIEnv *env) { static void workaround_non_threadsafe_calls_deinit(JNIEnv *env){
if (stat_clazz != NULL) {
(*env)->DeleteGlobalRef(env, stat_clazz);
stat_clazz = NULL;
}
if (pw_lock_object != NULL) { if (pw_lock_object != NULL) {
(*env)->DeleteGlobalRef(env, pw_lock_object); (*env)->DeleteGlobalRef(env, pw_lock_object);
pw_lock_object = NULL; pw_lock_object = NULL;
@ -255,6 +242,7 @@ static void stat_deinit(JNIEnv *env) {
} }
static void nioe_init(JNIEnv *env) { static void nioe_init(JNIEnv *env) {
if (nioe_ctor != NULL) return; // Already inited
// Init NativeIOException // Init NativeIOException
nioe_clazz = (*env)->FindClass( nioe_clazz = (*env)->FindClass(
env, "org/apache/hadoop/io/nativeio/NativeIOException"); env, "org/apache/hadoop/io/nativeio/NativeIOException");
@ -349,17 +337,53 @@ static void pmem_region_deinit(JNIEnv *env) {
*/ */
JNIEXPORT void JNICALL JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_initNative( Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
JNIEnv *env, jclass clazz) { JNIEnv *env, jclass clazz, jboolean doThreadsafeWorkaround) {
nioe_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
fd_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
#ifdef UNIX
errno_enum_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
#endif
if (doThreadsafeWorkaround) {
workaround_non_threadsafe_calls_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
}
return;
error:
// these are all idempotent and safe to call even if the
// class wasn't inited yet
nioe_deinit(env);
fd_deinit(env);
#ifdef UNIX
errno_enum_deinit(env);
#endif
if (doThreadsafeWorkaround) {
workaround_non_threadsafe_calls_deinit(env);
}
}
/*
* private static native void initNativePosix();
*/
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_initNativePosix(
JNIEnv *env, jclass clazz, jboolean doThreadsafeWorkaround) {
#ifdef UNIX #ifdef UNIX
consts_init(env); consts_init(env);
PASS_EXCEPTIONS_GOTO(env, error); PASS_EXCEPTIONS_GOTO(env, error);
#endif #endif
stat_init(env, clazz); stat_init(env);
PASS_EXCEPTIONS_GOTO(env, error); PASS_EXCEPTIONS_GOTO(env, error);
nioe_init(env); nioe_init(env);
PASS_EXCEPTIONS_GOTO(env, error); PASS_EXCEPTIONS_GOTO(env, error);
fd_init(env); fd_init(env);
PASS_EXCEPTIONS_GOTO(env, error); PASS_EXCEPTIONS_GOTO(env, error);
if (doThreadsafeWorkaround) {
workaround_non_threadsafe_calls_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
}
#ifdef UNIX #ifdef UNIX
errno_enum_init(env); errno_enum_init(env);
PASS_EXCEPTIONS_GOTO(env, error); PASS_EXCEPTIONS_GOTO(env, error);
@ -373,17 +397,43 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
error: error:
// these are all idempodent and safe to call even if the // these are all idempodent and safe to call even if the
// class wasn't initted yet // class wasn't initted yet
#ifdef UNIX
stat_deinit(env); stat_deinit(env);
#ifdef HADOOP_PMDK_LIBRARY #ifdef HADOOP_PMDK_LIBRARY
pmem_region_deinit(env); pmem_region_deinit(env);
#endif
#endif #endif
nioe_deinit(env); nioe_deinit(env);
fd_deinit(env); fd_deinit(env);
#ifdef UNIX #ifdef UNIX
errno_enum_deinit(env); errno_enum_deinit(env);
#endif #endif
if (doThreadsafeWorkaround) {
workaround_non_threadsafe_calls_deinit(env);
}
}
/*
* private static native void initNativeWindows();
*/
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_00024Windows_initNativeWindows(
JNIEnv *env, jclass clazz, jboolean doThreadsafeWorkaround) {
nioe_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
fd_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
if (doThreadsafeWorkaround) {
workaround_non_threadsafe_calls_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
}
return;
error:
// these are all idempodent and safe to call even if the
// class wasn't initted yet
nioe_deinit(env);
fd_deinit(env);
if (doThreadsafeWorkaround) {
workaround_non_threadsafe_calls_deinit(env);
}
} }
/* /*

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.nativeio;
import static org.junit.Assume.assumeTrue;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
/**
* Separate class to ensure forked Tests load the static blocks again.
*/
public class TestNativeIoInit {
/**
* Refer HADOOP-14451
* Scenario:
* 1. One thread calls a static method of NativeIO, which loads static block
* of NativeIo.
* 2. Second thread calls a static method of NativeIo.POSIX, which loads a
* static block of NativeIO.POSIX class
* <p>
* Expected: Loading these two static blocks separately should not result in
* deadlock.
*/
@Test(timeout = 10000)
public void testDeadlockLinux() throws Exception {
Thread one = new Thread() {
@Override
public void run() {
NativeIO.isAvailable();
}
};
Thread two = new Thread() {
@Override
public void run() {
NativeIO.POSIX.isAvailable();
}
};
two.start();
one.start();
one.join();
two.join();
}
@Test(timeout = 10000)
public void testDeadlockWindows() throws Exception {
assumeTrue("Expected windows", Path.WINDOWS);
Thread one = new Thread() {
@Override
public void run() {
NativeIO.isAvailable();
}
};
Thread two = new Thread() {
@Override
public void run() {
try {
NativeIO.Windows.extendWorkingSetSize(100);
} catch (IOException e) {
//igored
}
}
};
two.start();
one.start();
one.join();
two.join();
}
}