diff --git a/BUILDING.txt b/BUILDING.txt
index cc9ac177ca..8c57a1d8e2 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -78,6 +78,8 @@ Optional packages:
$ sudo apt-get install fuse libfuse-dev
* ZStandard compression
$ sudo apt-get install zstd
+* PMDK library for storage class memory(SCM) as HDFS cache backend
+ Please refer to http://pmem.io/ and https://github.com/pmem/pmdk
----------------------------------------------------------------------------------
Maven main modules:
@@ -262,6 +264,32 @@ Maven build goals:
invoke, run 'mvn dependency-check:aggregate'. Note that this plugin
requires maven 3.1.1 or greater.
+ PMDK library build options:
+
+ The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing
+ collection of libraries which have been developed for various use cases, tuned,
+ validated to production quality, and thoroughly documented. These libraries are built
+ on the Direct Access (DAX) feature available in both Linux and Windows, which allows
+ applications directly load/store access to persistent memory by memory-mapping files
+ on a persistent memory aware file system.
+
+ It is currently an optional component, meaning that Hadoop can be built without
+ this dependency. Please Note the library is used via dynamic module. For getting
+ more details please refer to the official sites:
+ http://pmem.io/ and https://github.com/pmem/pmdk.
+
+ * -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this
+ option provided, the build will fail if libpmem library is not found. If this option
+ is not given, the build will generate a version of Hadoop with libhadoop.so.
+ And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved.
+ Because PMDK can bring better caching write/read performance, it is recommended to build
+ the project with this option if user plans to use SCM backed HDFS cache.
+ * -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not
+ under /usr/lib or /usr/lib64.
+ * -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar
+ package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
+ the build will fail if -Dpmdk.lib is not specified.
+
----------------------------------------------------------------------------------
Building components separately
diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs
index 67d2edf22d..4a783f086a 100755
--- a/dev-support/bin/dist-copynativelibs
+++ b/dev-support/bin/dist-copynativelibs
@@ -96,6 +96,12 @@ for i in "$@"; do
--isalbundle=*)
ISALBUNDLE=${i#*=}
;;
+ --pmdklib=*)
+ PMDKLIB=${i#*=}
+ ;;
+ --pmdkbundle=*)
+ PMDKBUNDLE=${i#*=}
+ ;;
--opensslbinbundle=*)
OPENSSLBINBUNDLE=${i#*=}
;;
@@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then
bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"
bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
+
+ bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}"
fi
# Windows
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 64e4d04419..5b600538d6 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -682,6 +682,8 @@
${require.isal}
${isal.prefix}
${isal.lib}
+ ${require.pmdk}
+ ${pmdk.lib}
${require.openssl}
${openssl.prefix}
${openssl.lib}
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index b9287c0f4b..771c685c70 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -121,6 +121,7 @@ else ()
ENDIF(REQUIRE_ZSTD)
endif ()
+#Require ISA-L
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("2")
find_library(ISAL_LIBRARY
@@ -159,6 +160,25 @@ else (ISAL_LIBRARY)
ENDIF(REQUIRE_ISAL)
endif (ISAL_LIBRARY)
+# Build with PMDK library if -Drequire.pmdk option is specified.
+if(REQUIRE_PMDK)
+ set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+ hadoop_set_find_shared_library_version("1")
+ find_library(PMDK_LIBRARY
+ NAMES pmem
+ PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64)
+ set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+
+ if(PMDK_LIBRARY)
+ GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
+ set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
+ else(PMDK_LIBRARY)
+ MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
+ endif(PMDK_LIBRARY)
+else(REQUIRE_PMDK)
+ MESSAGE(STATUS "Build without PMDK support.")
+endif(REQUIRE_PMDK)
+
# Build hardware CRC32 acceleration, if supported on the platform.
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop
${SRC}/io/compress/zlib/ZlibDecompressor.c
${BZIP2_SOURCE_FILES}
${SRC}/io/nativeio/NativeIO.c
+ ${PMDK_SOURCE_FILES}
${SRC}/io/nativeio/errno_enum.c
${SRC}/io/nativeio/file_descriptor.c
${SRC}/io/nativeio/SharedFileDescriptorFactory.c
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index 40aa467373..7e23a5df32 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -24,6 +24,7 @@
#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
#cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
+#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 4e0cd8fdd8..1d0eab7f5c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -100,6 +100,48 @@ public static class POSIX {
write. */
public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+ /**
+ * Keeps the support state of PMDK.
+ */
+ public enum SupportState {
+ UNSUPPORTED(-1),
+ PMDK_LIB_NOT_FOUND(1),
+ SUPPORTED(0);
+
+ private byte stateCode;
+ SupportState(int stateCode) {
+ this.stateCode = (byte) stateCode;
+ }
+
+ public int getStateCode() {
+ return stateCode;
+ }
+
+ public String getMessage() {
+ String msg;
+ switch (stateCode) {
+ case -1:
+ msg = "The native code is built without PMDK support.";
+ break;
+ case 1:
+ msg = "The native code is built with PMDK support, but PMDK libs " +
+ "are NOT found in execution environment or failed to be loaded.";
+ break;
+ case 0:
+ msg = "The native code is built with PMDK support, and PMDK libs " +
+ "are loaded successfully.";
+ break;
+ default:
+ msg = "The state code: " + stateCode + " is unrecognized!";
+ }
+ return msg;
+ }
+ }
+
+ // Denotes the state of supporting PMDK. The value is set by JNI.
+ private static SupportState pmdkSupportState =
+ SupportState.PMDK_LIB_NOT_FOUND;
+
private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
// Set to true via JNI if possible
@@ -124,6 +166,93 @@ public static void setCacheManipulator(CacheManipulator cacheManipulator) {
POSIX.cacheManipulator = cacheManipulator;
}
+ // This method is invoked by JNI.
+ public static void setPmdkSupportState(int stateCode) {
+ for (SupportState state : SupportState.values()) {
+ if (state.getStateCode() == stateCode) {
+ pmdkSupportState = state;
+ return;
+ }
+ }
+ LOG.error("The state code: " + stateCode + " is unrecognized!");
+ }
+
+ public static boolean isPmdkAvailable() {
+ LOG.info(pmdkSupportState.getMessage());
+ return pmdkSupportState == SupportState.SUPPORTED;
+ }
+
+ /**
+ * Denote memory region for a file mapped.
+ */
+ public static class PmemMappedRegion {
+ private long address;
+ private long length;
+ private boolean isPmem;
+
+ public PmemMappedRegion(long address, long length, boolean isPmem) {
+ this.address = address;
+ this.length = length;
+ this.isPmem = isPmem;
+ }
+
+ public boolean isPmem() {
+ return this.isPmem;
+ }
+
+ public long getAddress() {
+ return this.address;
+ }
+
+ public long getLength() {
+ return this.length;
+ }
+ }
+
+ /**
+ * JNI wrapper of persist memory operations.
+ */
+ public static class Pmem {
+ // check whether the address is a Pmem address or DIMM address
+ public static boolean isPmem(long address, long length) {
+ return NativeIO.POSIX.isPmemCheck(address, length);
+ }
+
+ // create a pmem file and memory map it
+ public static PmemMappedRegion mapBlock(String path, long length) {
+ return NativeIO.POSIX.pmemCreateMapFile(path, length);
+ }
+
+ // unmap a pmem file
+ public static boolean unmapBlock(long address, long length) {
+ return NativeIO.POSIX.pmemUnMap(address, length);
+ }
+
+ // copy data from disk file(src) to pmem file(dest), without flush
+ public static void memCopy(byte[] src, long dest, boolean isPmem,
+ long length) {
+ NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
+ }
+
+ // flush the memory content to persistent storage
+ public static void memSync(PmemMappedRegion region) {
+ if (region.isPmem()) {
+ NativeIO.POSIX.pmemDrain();
+ } else {
+ NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
+ }
+ }
+ }
+
+ private static native boolean isPmemCheck(long address, long length);
+ private static native PmemMappedRegion pmemCreateMapFile(String path,
+ long length);
+ private static native boolean pmemUnMap(long address, long length);
+ private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
+ long length);
+ private static native void pmemDrain();
+ private static native void pmemSync(long address, long length);
+
/**
* Used to manipulate the operating system cache.
*/
@@ -143,8 +272,8 @@ public long getOperatingSystemPageSize() {
}
public void posixFadviseIfPossible(String identifier,
- FileDescriptor fd, long offset, long len, int flags)
- throws NativeIOException {
+ FileDescriptor fd, long offset, long len, int flags)
+ throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}
@@ -748,7 +877,7 @@ public CachedUid(String username, long timestamp) {
* user account name, of the format DOMAIN\UserName. This method
* will remove the domain part of the full logon name.
*
- * @param Fthe full principal name containing the domain
+ * @param name the full principal name containing the domain
* @return name with domain removed
*/
private static String stripDomain(String name) {
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index 2274d57ca9..3a0641ba26 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -36,6 +36,10 @@
#include
#include
#include
+#ifdef HADOOP_PMDK_LIBRARY
+#include
+#include "pmdk_load.h"
+#endif
#if !(defined(__FreeBSD__) || defined(__MACH__))
#include
#endif
@@ -60,6 +64,7 @@
#define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX"
#define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat"
+#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion"
#define SET_INT_OR_RETURN(E, C, F) \
{ \
@@ -81,6 +86,12 @@ static jmethodID nioe_ctor;
// Please see HADOOP-7156 for details.
jobject pw_lock_object;
+#ifdef HADOOP_PMDK_LIBRARY
+// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor
+static jclass pmem_region_clazz = NULL;
+static jmethodID pmem_region_ctor = NULL;
+#endif
+
/*
* Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c
@@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) {
nioe_ctor = NULL;
}
+#ifdef HADOOP_PMDK_LIBRARY
+static int loadPmdkLib(JNIEnv *env) {
+ char errMsg[1024];
+ jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS);
+ if (clazz == NULL) {
+ return 0; // exception has been raised
+ }
+ load_pmdk_lib(errMsg, sizeof(errMsg));
+ jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V");
+ if (mid == 0) {
+ return 0;
+ }
+ if (strlen(errMsg) > 0) {
+ (*env)->CallStaticVoidMethod(env, clazz, mid, 1);
+ return 0;
+ }
+ (*env)->CallStaticVoidMethod(env, clazz, mid, 0);
+ return 1;
+}
+
+static void pmem_region_init(JNIEnv *env, jclass nativeio_class) {
+
+ jclass clazz = NULL;
+ // Init Stat
+ clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS);
+ if (!clazz) {
+ THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class");
+ return; // exception has been raised
+ }
+
+ // Init PmemMappedRegion class
+ pmem_region_clazz = (*env)->NewGlobalRef(env, clazz);
+ if (!pmem_region_clazz) {
+ THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class");
+ return; // exception has been raised
+ }
+
+ pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "", "(JJZ)V");
+ if (!pmem_region_ctor) {
+ THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor");
+ return; // exception has been raised
+ }
+}
+
+static void pmem_region_deinit(JNIEnv *env) {
+ if (pmem_region_ctor != NULL) {
+ (*env)->DeleteGlobalRef(env, pmem_region_ctor);
+ pmem_region_ctor = NULL;
+ }
+
+ if (pmem_region_clazz != NULL) {
+ (*env)->DeleteGlobalRef(env, pmem_region_clazz);
+ pmem_region_clazz = NULL;
+ }
+ }
+#endif
+
/*
* private static native void initNative();
*
@@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
#ifdef UNIX
errno_enum_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
+#ifdef HADOOP_PMDK_LIBRARY
+ if (loadPmdkLib(env)) {
+ pmem_region_init(env, clazz);
+ }
+#endif
#endif
return;
error:
@@ -299,6 +372,9 @@ error:
// class wasn't initted yet
#ifdef UNIX
stat_deinit(env);
+#ifdef HADOOP_PMDK_LIBRARY
+ pmem_region_deinit(env);
+#endif
#endif
nioe_deinit(env);
fd_deinit(env);
@@ -1383,3 +1459,179 @@ cleanup:
/**
* vim: sw=2: ts=2: et:
*/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: isPmemCheck
+ * Signature: (JJ)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ jint is_pmem = pmdkLoader->pmem_is_pmem(address, length);
+ return (is_pmem) ? JNI_TRUE : JNI_FALSE;
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function isPmemCheck is not supported.");
+ return JNI_FALSE;
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemCreateMapFile
+ * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
+JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ /* create a pmem file and memory map it */
+ const char * path = NULL;
+ void * pmemaddr = NULL;
+ size_t mapped_len = 0;
+ int is_pmem = 1;
+ char msg[1000];
+
+ path = (*env)->GetStringUTFChars(env, filePath, NULL);
+ if (!path) {
+ THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null");
+ return NULL;
+ }
+
+ if (fileLength <= 0) {
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+ THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
+ return NULL;
+ }
+
+ pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
+ 0666, &mapped_len, &is_pmem);
+
+ if (!pmemaddr) {
+ snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+ return NULL;
+ }
+
+ if (fileLength != mapped_len) {
+ snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+ return NULL;
+ }
+
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+
+ if ((!pmem_region_clazz) || (!pmem_region_ctor)) {
+ THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL");
+ return NULL;
+ }
+
+ jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem);
+ return ret;
+
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemCreateMapFile is not supported.");
+ return NULL;
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemUnMap
+ * Signature: (JJ)V
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ int succeed = 0;
+ char msg[1000];
+ succeed = pmdkLoader->pmem_unmap(address, length);
+ // succeed = -1 failure; succeed = 0 success
+ if (succeed != 0) {
+ snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ return JNI_FALSE;
+ } else {
+ return JNI_TRUE;
+ }
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemUnMap is not supported.");
+ return JNI_FALSE;
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemCopy
+ * Signature: ([BJJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy(
+JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ char msg[1000];
+ jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0);
+ snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf);
+ if (is_pmem) {
+ pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length);
+ } else {
+ memcpy(address, srcBuf, length);
+ }
+ (*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0);
+ return;
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemCopy is not supported.");
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO
+ * Method: pmemDrain
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain(
+JNIEnv *env, jclass thisClass) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ pmdkLoader->pmem_drain();
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemDrain is not supported.");
+ #endif
+ }
+
+ /*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemSync
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync
+ (JNIEnv * env, jclass thisClass, jlong address, jlong length) {
+
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ int succeed = 0;
+ char msg[1000];
+ succeed = pmdkLoader->pmem_msync(address, length);
+ // succeed = -1 failure
+ if (succeed = -1) {
+ snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ return;
+ }
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemSync is not supported.");
+ #endif
+ }
+
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
new file mode 100644
index 0000000000..f7d6cfba27
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+
+#include "org_apache_hadoop.h"
+#include "pmdk_load.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
+
+#ifdef UNIX
+#include
+#include
+#include
+#include
+
+#include "config.h"
+#endif
+
+PmdkLibLoader * pmdkLoader;
+
+/**
+ * pmdk_load.c
+ * Utility of loading the libpmem library and the required functions.
+ * Building of this codes won't rely on any libpmem source codes, but running
+ * into this will rely on successfully loading of the dynamic library.
+ *
+ */
+
+static const char* load_functions() {
+#ifdef UNIX
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync");
+#endif
+ return NULL;
+}
+
+void load_pmdk_lib(char* err, size_t err_len) {
+ const char* errMsg;
+ const char* library = NULL;
+#ifdef UNIX
+ Dl_info dl_info;
+#else
+ LPTSTR filename = NULL;
+#endif
+
+ err[0] = '\0';
+
+ if (pmdkLoader != NULL) {
+ return;
+ }
+ pmdkLoader = calloc(1, sizeof(PmdkLibLoader));
+
+ // Load PMDK library
+ #ifdef UNIX
+ pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+ if (pmdkLoader->libec == NULL) {
+ snprintf(err, err_len, "Failed to load %s (%s)",
+ HADOOP_PMDK_LIBRARY, dlerror());
+ return;
+ }
+ // Clear any existing error
+ dlerror();
+ #endif
+ errMsg = load_functions(pmdkLoader->libec);
+ if (errMsg != NULL) {
+ snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
+ }
+
+#ifdef UNIX
+ if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
+ library = dl_info.dli_fname;
+ }
+#else
+ if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
+ library = filename;
+ }
+#endif
+
+ if (library == NULL) {
+ library = HADOOP_PMDK_LIBRARY;
+ }
+
+ pmdkLoader->libname = strdup(library);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
new file mode 100644
index 0000000000..c93a076fc0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include
+#include
+#include
+#include
+#endif
+
+#ifndef _PMDK_LOAD_H_
+#define _PMDK_LOAD_H_
+
+
+#ifdef UNIX
+// For libpmem.h
+typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode,
+ size_t *mapped_lenp, int *is_pmemp);
+typedef int (* __d_pmem_unmap)(void *addr, size_t len);
+typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len);
+typedef void (*__d_pmem_drain)(void);
+typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len);
+typedef int (* __d_pmem_msync)(const void *addr, size_t len);
+
+#endif
+
+typedef struct __PmdkLibLoader {
+ // The loaded library handle
+ void* libec;
+ char* libname;
+ __d_pmem_map_file pmem_map_file;
+ __d_pmem_unmap pmem_unmap;
+ __d_pmem_is_pmem pmem_is_pmem;
+ __d_pmem_drain pmem_drain;
+ __d_pmem_memcpy_nodrain pmem_memcpy_nodrain;
+ __d_pmem_msync pmem_msync;
+} PmdkLibLoader;
+
+extern PmdkLibLoader * pmdkLoader;
+
+/**
+ * A helper function to dlsym a 'symbol' from a given library-handle.
+ */
+
+#ifdef UNIX
+
+static __attribute__ ((unused))
+void *myDlsym(void *handle, const char *symbol) {
+ void *func_ptr = dlsym(handle, symbol);
+ return func_ptr;
+}
+
+/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */
+#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \
+ if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \
+ return "Failed to load symbol" symbol; \
+ }
+
+#endif
+
+/**
+ * Return 0 if not support, 1 otherwise.
+ */
+int build_support_pmdk();
+
+/**
+ * Initialize and load PMDK library, returning error message if any.
+ *
+ * @param err The err message buffer.
+ * @param err_len The length of the message buffer.
+ */
+void load_pmdk_lib(char* err, size_t err_len);
+
+#endif //_PMDK_LOAD_H_
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index 6b3c2325d8..a14928c7b4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -25,6 +25,8 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
@@ -782,4 +784,155 @@ public void testNativeFadviseConsts() {
assertTrue("Native POSIX_FADV_NOREUSE const not set",
POSIX_FADV_NOREUSE >= 0);
}
+
+
+ @Test (timeout=10000)
+ public void testPmemCheckParameters() {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Please make sure /mnt/pmem0 is a persistent memory device with total
+ // volume size 'volumeSize'
+ String filePath = "/$:";
+ long length = 0;
+ long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+ // Incorrect file length
+ try {
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("Illegal length parameter should be detected");
+ } catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+
+ // Incorrect file length
+ filePath = "/mnt/pmem0/test_native_io";
+ length = -1L;
+ try {
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("Illegal length parameter should be detected");
+ }catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test (timeout=10000)
+ public void testPmemMapMultipleFiles() {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Please make sure /mnt/pmem0 is a persistent memory device with total
+ // volume size 'volumeSize'
+ String filePath = "/mnt/pmem0/test_native_io";
+ long length = 0;
+ long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+ // Multiple files, each with 128MB size, aggregated size exceeds volume
+ // limit 16GB
+ length = 128 * 1024 * 1024L;
+ long fileNumber = volumnSize / length;
+ LOG.info("File number = " + fileNumber);
+ for (int i = 0; i < fileNumber; i++) {
+ String path = filePath + i;
+ LOG.info("File path = " + path);
+ NativeIO.POSIX.Pmem.mapBlock(path, length);
+ }
+ try {
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("Request map extra file when persistent memory is all occupied");
+ } catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test (timeout=10000)
+ public void testPmemMapBigFile() {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Please make sure /mnt/pmem0 is a persistent memory device with total
+ // volume size 'volumeSize'
+ String filePath = "/mnt/pmem0/test_native_io_big";
+ long length = 0;
+ long volumeSize = 16 * 1024 * 1024 * 1024L;
+
+ // One file length exceeds persistent memory volume 16GB.
+ length = volumeSize + 1024L;
+ try {
+ LOG.info("File length = " + length);
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("File length exceeds persistent memory total volume size");
+ }catch (Exception e) {
+ LOG.info(e.getMessage());
+ deletePmemMappedFile(filePath);
+ }
+ }
+
+ @Test (timeout=10000)
+ public void testPmemCopy() throws IOException {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Create and map a block file. Please make sure /mnt/pmem0 is a persistent
+ // memory device.
+ String filePath = "/mnt/pmem0/copy";
+ long length = 4096;
+ PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
+ assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
+ assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
+ assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length));
+
+ // Copy content to mapped file
+ byte[] data = generateSequentialBytes(0, (int) length);
+ NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(),
+ length);
+
+ // Read content before pmemSync
+ byte[] readBuf1 = new byte[(int)length];
+ IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length);
+ assertArrayEquals(data, readBuf1);
+
+ byte[] readBuf2 = new byte[(int)length];
+ // Sync content to persistent memory twice
+ NativeIO.POSIX.Pmem.memSync(region);
+ NativeIO.POSIX.Pmem.memSync(region);
+ // Read content after pmemSync twice
+ IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length);
+ assertArrayEquals(data, readBuf2);
+
+ //Read content after unmap twice
+ NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+ NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+ byte[] readBuf3 = new byte[(int)length];
+ IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length);
+ assertArrayEquals(data, readBuf3);
+ }
+
+ private static byte[] generateSequentialBytes(int start, int length) {
+ byte[] result = new byte[length];
+
+ for (int i = 0; i < length; i++) {
+ result[i] = (byte) ((start + i) % 127);
+ }
+ return result;
+ }
+
+ private static void deletePmemMappedFile(String filePath) {
+ try {
+ if (filePath != null) {
+ boolean result = Files.deleteIfExists(Paths.get(filePath));
+ if (!result) {
+ throw new IOException();
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed to delete the mapped file " + filePath +
+ " from persistent memory", e);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 4fab214a05..37e548e220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -214,6 +214,28 @@ String getReplicaCachePath(String bpid, long blockId) {
return PmemVolumeManager.getInstance().getCachePath(key);
}
+ /**
+ * Get cache address on persistent memory for read operation.
+ * The cache address comes from PMDK lib function when mapping
+ * block to persistent memory.
+ *
+ * @param bpid blockPoolId
+ * @param blockId blockId
+ * @return address
+ */
+ long getCacheAddress(String bpid, long blockId) {
+ if (cacheLoader.isTransientCache() ||
+ !isCached(bpid, blockId)) {
+ return -1;
+ }
+ if (!(cacheLoader.isNativeLoader())) {
+ return -1;
+ }
+ ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+ MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock;
+ return mappableBlock.getAddress();
+ }
+
/**
* @return List of cached blocks suitable for translation into a
* {@link BlockListAsLongs} for a cache report.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 80738d3dcf..76110d68b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -803,6 +803,14 @@ private InputStream getBlockInputStreamWithCheckingPmemCache(
String cachePath = cacheManager.getReplicaCachePath(
b.getBlockPoolId(), b.getBlockId());
if (cachePath != null) {
+ long addr = cacheManager.getCacheAddress(
+ b.getBlockPoolId(), b.getBlockId());
+ if (addr != -1) {
+ LOG.debug("Get InputStream by cache address.");
+ return FsDatasetUtil.getDirectInputStream(
+ addr, info.getBlockDataLength());
+ }
+ LOG.debug("Get InputStream by cache file path.");
return FsDatasetUtil.getInputStreamAndSeek(
new File(cachePath), seekOffset);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 5308b60b59..fbd02c7682 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -25,7 +25,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -42,6 +45,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
/** Utility methods. */
@InterfaceAudience.Private
@@ -131,6 +135,24 @@ public static InputStream getInputStreamAndSeek(File file, long offset)
}
}
+ public static InputStream getDirectInputStream(long addr, long length)
+ throws IOException {
+ try {
+ Class> directByteBufferClass =
+ Class.forName("java.nio.DirectByteBuffer");
+ Constructor> constructor =
+ directByteBufferClass.getDeclaredConstructor(long.class, int.class);
+ constructor.setAccessible(true);
+ ByteBuffer byteBuffer =
+ (ByteBuffer) constructor.newInstance(addr, (int)length);
+ return new ByteBufferBackedInputStream(byteBuffer);
+ } catch (ClassNotFoundException | NoSuchMethodException |
+ IllegalAccessException | InvocationTargetException |
+ InstantiationException e) {
+ throw new IOException(e);
+ }
+ }
+
/**
* Find the meta-file for the specified block file and then return the
* generation stamp from the name of the meta-file. Generally meta file will
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 0fff32741c..a00c442b83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable {
* @return the number of bytes that have been cached.
*/
long getLength();
+
+ /**
+ * Get cache address if applicable.
+ * Return -1 if not applicable.
+ */
+ long getAddress();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 3ec84164c8..5b9ba3a1d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -64,8 +64,7 @@ public abstract class MappableBlockLoader {
* @return The Mappable block.
*/
abstract MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
throws IOException;
/**
@@ -106,6 +105,11 @@ abstract MappableBlock load(long length, FileInputStream blockIn,
*/
abstract boolean isTransientCache();
+ /**
+ * Check whether this is a native pmem cache loader.
+ */
+ abstract boolean isNativeLoader();
+
/**
* Clean up cache, can be used during DataNode shutdown.
*/
@@ -117,8 +121,7 @@ void shutdown() {
* Verifies the block's checksum. This is an I/O intensive operation.
*/
protected void verifyChecksum(long length, FileInputStream metaIn,
- FileChannel blockChannel, String blockFileName)
- throws IOException {
+ FileChannel blockChannel, String blockFileName) throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
index 43b1b531af..65693735b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.io.nativeio.NativeIO;
/**
* Creates MappableBlockLoader.
@@ -42,6 +43,9 @@ public static MappableBlockLoader createCacheLoader(DNConf conf) {
if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
return new MemoryMappableBlockLoader();
}
+ if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) {
+ return new NativePmemMappableBlockLoader();
+ }
return new PmemMappableBlockLoader();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 52d8d931c0..dd4188c0b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -66,8 +66,7 @@ void initialize(FsDatasetCache cacheManager) throws IOException {
*/
@Override
MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
throws IOException {
MemoryMappedBlock mappableBlock = null;
MappedByteBuffer mmap = null;
@@ -116,4 +115,9 @@ long release(ExtendedBlockId key, long bytesCount) {
public boolean isTransientCache() {
return true;
}
+
+ @Override
+ public boolean isNativeLoader() {
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
index c09ad1a588..47dfeae326 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -44,6 +44,11 @@ public long getLength() {
return length;
}
+ @Override
+ public long getAddress() {
+ return -1L;
+ }
+
@Override
public void close() {
if (mmap != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
new file mode 100644
index 0000000000..09e9454e76
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Map block to persistent memory with native PMDK libs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
+
+ @Override
+ void initialize(FsDatasetCache cacheManager) throws IOException {
+ super.initialize(cacheManager);
+ }
+
+ /**
+ * Load the block.
+ *
+ * Map the block and verify its checksum.
+ *
+ * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+ * is a persistent memory volume chosen by PmemVolumeManager.
+ *
+ * @param length The current length of the block.
+ * @param blockIn The block input stream. Should be positioned at the
+ * start. The caller must close this.
+ * @param metaIn The meta file input stream. Should be positioned at
+ * the start. The caller must close this.
+ * @param blockFileName The block file name, for logging purposes.
+ * @param key The extended block ID.
+ *
+ * @throws IOException If mapping block to persistent memory fails or
+ * checksum fails.
+ *
+ * @return The Mappable block.
+ */
+ @Override
+ public MappableBlock load(long length, FileInputStream blockIn,
+ FileInputStream metaIn, String blockFileName,
+ ExtendedBlockId key)
+ throws IOException {
+ NativePmemMappedBlock mappableBlock = null;
+ POSIX.PmemMappedRegion region = null;
+ String filePath = null;
+
+ FileChannel blockChannel = null;
+ try {
+ blockChannel = blockIn.getChannel();
+ if (blockChannel == null) {
+ throw new IOException("Block InputStream has no FileChannel.");
+ }
+
+ assert NativeIO.isAvailable();
+ filePath = PmemVolumeManager.getInstance().getCachePath(key);
+ region = POSIX.Pmem.mapBlock(filePath, length);
+ if (region == null) {
+ throw new IOException("Failed to map the block " + blockFileName +
+ " to persistent storage.");
+ }
+ verifyChecksumAndMapBlock(region, length, metaIn, blockChannel,
+ blockFileName);
+ mappableBlock = new NativePmemMappedBlock(region.getAddress(),
+ region.getLength(), key);
+ LOG.info("Successfully cached one replica:{} into persistent memory"
+ + ", [cached path={}, address={}, length={}]", key, filePath,
+ region.getAddress(), length);
+ } finally {
+ IOUtils.closeQuietly(blockChannel);
+ if (mappableBlock == null) {
+ if (region != null) {
+ // unmap content from persistent memory
+ POSIX.Pmem.unmapBlock(region.getAddress(),
+ region.getLength());
+ FsDatasetUtil.deleteMappedFile(filePath);
+ }
+ }
+ }
+ return mappableBlock;
+ }
+
+ /**
+ * Verifies the block's checksum meanwhile map block to persistent memory.
+ * This is an I/O intensive operation.
+ */
+ private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
+ long length, FileInputStream metaIn, FileChannel blockChannel,
+ String blockFileName) throws IOException {
+ // Verify the checksum from the block's meta file
+ // Get the DataChecksum from the meta file header
+ BlockMetadataHeader header =
+ BlockMetadataHeader.readHeader(new DataInputStream(
+ new BufferedInputStream(metaIn, BlockMetadataHeader
+ .getHeaderSize())));
+ FileChannel metaChannel = null;
+ try {
+ metaChannel = metaIn.getChannel();
+ if (metaChannel == null) {
+ throw new IOException("Cannot get FileChannel" +
+ " from Block InputStream meta file.");
+ }
+ DataChecksum checksum = header.getChecksum();
+ final int bytesPerChecksum = checksum.getBytesPerChecksum();
+ final int checksumSize = checksum.getChecksumSize();
+ final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
+ ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
+ ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
+ // Verify the checksum
+ int bytesVerified = 0;
+ long mappedAddress = -1L;
+ if (region != null) {
+ mappedAddress = region.getAddress();
+ }
+ while (bytesVerified < length) {
+ Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
+ "Unexpected partial chunk before EOF.");
+ assert bytesVerified % bytesPerChecksum == 0;
+ int bytesRead = fillBuffer(blockChannel, blockBuf);
+ if (bytesRead == -1) {
+ throw new IOException(
+ "Checksum verification failed for the block " + blockFileName +
+ ": premature EOF");
+ }
+ blockBuf.flip();
+ // Number of read chunks, including partial chunk at end
+ int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuf.limit(chunks * checksumSize);
+ fillBuffer(metaChannel, checksumBuf);
+ checksumBuf.flip();
+ checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
+ bytesVerified);
+ // Success
+ bytesVerified += bytesRead;
+ // Copy data to persistent file
+ POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress,
+ region.isPmem(), bytesRead);
+ mappedAddress += bytesRead;
+ // Clear buffer
+ blockBuf.clear();
+ checksumBuf.clear();
+ }
+ if (region != null) {
+ POSIX.Pmem.memSync(region);
+ }
+ } finally {
+ IOUtils.closeQuietly(metaChannel);
+ }
+ }
+
+ @Override
+ public boolean isNativeLoader() {
+ return true;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
new file mode 100644
index 0000000000..92012b2d93
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Represents an HDFS block that is mapped to persistent memory by the DataNode.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NativePmemMappedBlock implements MappableBlock {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NativePmemMappedBlock.class);
+
+ private long pmemMappedAddress = -1L;
+ private long length;
+ private ExtendedBlockId key;
+
+ NativePmemMappedBlock(long pmemMappedAddress, long length,
+ ExtendedBlockId key) {
+ assert length > 0;
+ this.pmemMappedAddress = pmemMappedAddress;
+ this.length = length;
+ this.key = key;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public long getAddress() {
+ return pmemMappedAddress;
+ }
+
+ @Override
+ public void close() {
+ if (pmemMappedAddress != -1L) {
+ String cacheFilePath =
+ PmemVolumeManager.getInstance().getCachePath(key);
+ try {
+ // Current libpmem will report error when pmem_unmap is called with
+ // length not aligned with page size, although the length is returned
+ // by pmem_map_file.
+ boolean success =
+ NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length);
+ if (!success) {
+ throw new IOException("Failed to unmap the mapped file from " +
+ "pmem address: " + pmemMappedAddress);
+ }
+ pmemMappedAddress = -1L;
+ FsDatasetUtil.deleteMappedFile(cacheFilePath);
+ LOG.info("Successfully uncached one replica:{} from persistent memory"
+ + ", [cached path={}, length={}]", key, cacheFilePath, length);
+ } catch (IOException e) {
+ LOG.warn("IOException occurred for block {}!", key, e);
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 239fff815b..70a42c41f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
- LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
+ LOG.info("Initializing cache loader: " + this.getClass().getName());
DNConf dnConf = cacheManager.getDnConf();
PmemVolumeManager.init(dnConf.getPmemVolumes());
pmemVolumeManager = PmemVolumeManager.getInstance();
@@ -71,8 +71,7 @@ void initialize(FsDatasetCache cacheManager) throws IOException {
*/
@Override
MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
throws IOException {
PmemMappedBlock mappableBlock = null;
String cachePath = null;
@@ -132,6 +131,11 @@ public boolean isTransientCache() {
return false;
}
+ @Override
+ public boolean isNativeLoader() {
+ return false;
+ }
+
@Override
void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown.");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index 25c3d400bd..a49626a321 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -49,6 +49,11 @@ public long getLength() {
return length;
}
+ @Override
+ public long getAddress() {
+ return -1L;
+ }
+
@Override
public void close() {
String cacheFilePath =