HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.

This commit is contained in:
Sammi Chen 2019-06-05 21:33:00 +08:00
parent 309501c6fa
commit d1aad44490
22 changed files with 1166 additions and 12 deletions

View File

@ -78,6 +78,8 @@ Optional packages:
$ sudo apt-get install fuse libfuse-dev $ sudo apt-get install fuse libfuse-dev
* ZStandard compression * ZStandard compression
$ sudo apt-get install zstd $ sudo apt-get install zstd
* PMDK library for storage class memory(SCM) as HDFS cache backend
Please refer to http://pmem.io/ and https://github.com/pmem/pmdk
---------------------------------------------------------------------------------- ----------------------------------------------------------------------------------
Maven main modules: Maven main modules:
@ -262,6 +264,32 @@ Maven build goals:
invoke, run 'mvn dependency-check:aggregate'. Note that this plugin invoke, run 'mvn dependency-check:aggregate'. Note that this plugin
requires maven 3.1.1 or greater. requires maven 3.1.1 or greater.
PMDK library build options:
The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing
collection of libraries which have been developed for various use cases, tuned,
validated to production quality, and thoroughly documented. These libraries are built
on the Direct Access (DAX) feature available in both Linux and Windows, which allows
applications directly load/store access to persistent memory by memory-mapping files
on a persistent memory aware file system.
It is currently an optional component, meaning that Hadoop can be built without
this dependency. Please Note the library is used via dynamic module. For getting
more details please refer to the official sites:
http://pmem.io/ and https://github.com/pmem/pmdk.
* -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this
option provided, the build will fail if libpmem library is not found. If this option
is not given, the build will generate a version of Hadoop with libhadoop.so.
And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved.
Because PMDK can bring better caching write/read performance, it is recommended to build
the project with this option if user plans to use SCM backed HDFS cache.
* -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not
under /usr/lib or /usr/lib64.
* -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar
package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
the build will fail if -Dpmdk.lib is not specified.
---------------------------------------------------------------------------------- ----------------------------------------------------------------------------------
Building components separately Building components separately

View File

@ -96,6 +96,12 @@ for i in "$@"; do
--isalbundle=*) --isalbundle=*)
ISALBUNDLE=${i#*=} ISALBUNDLE=${i#*=}
;; ;;
--pmdklib=*)
PMDKLIB=${i#*=}
;;
--pmdkbundle=*)
PMDKBUNDLE=${i#*=}
;;
--opensslbinbundle=*) --opensslbinbundle=*)
OPENSSLBINBUNDLE=${i#*=} OPENSSLBINBUNDLE=${i#*=}
;; ;;
@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then
bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"
bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}" bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}"
fi fi
# Windows # Windows

View File

@ -682,6 +682,8 @@
<REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL> <REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL>
<CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX> <CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX>
<CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB> <CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB>
<REQUIRE_PMDK>${require.pmdk}</REQUIRE_PMDK>
<CUSTOM_PMDK_LIB>${pmdk.lib}</CUSTOM_PMDK_LIB>
<REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL> <REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
<CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX> <CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
<CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB> <CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>

View File

@ -121,6 +121,7 @@ else ()
ENDIF(REQUIRE_ZSTD) ENDIF(REQUIRE_ZSTD)
endif () endif ()
#Require ISA-L
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("2") hadoop_set_find_shared_library_version("2")
find_library(ISAL_LIBRARY find_library(ISAL_LIBRARY
@ -159,6 +160,25 @@ else (ISAL_LIBRARY)
ENDIF(REQUIRE_ISAL) ENDIF(REQUIRE_ISAL)
endif (ISAL_LIBRARY) endif (ISAL_LIBRARY)
# Build with PMDK library if -Drequire.pmdk option is specified.
if(REQUIRE_PMDK)
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("1")
find_library(PMDK_LIBRARY
NAMES pmem
PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64)
set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
if(PMDK_LIBRARY)
GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
else(PMDK_LIBRARY)
MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
endif(PMDK_LIBRARY)
else(REQUIRE_PMDK)
MESSAGE(STATUS "Build without PMDK support.")
endif(REQUIRE_PMDK)
# Build hardware CRC32 acceleration, if supported on the platform. # Build hardware CRC32 acceleration, if supported on the platform.
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64") if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c") set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop
${SRC}/io/compress/zlib/ZlibDecompressor.c ${SRC}/io/compress/zlib/ZlibDecompressor.c
${BZIP2_SOURCE_FILES} ${BZIP2_SOURCE_FILES}
${SRC}/io/nativeio/NativeIO.c ${SRC}/io/nativeio/NativeIO.c
${PMDK_SOURCE_FILES}
${SRC}/io/nativeio/errno_enum.c ${SRC}/io/nativeio/errno_enum.c
${SRC}/io/nativeio/file_descriptor.c ${SRC}/io/nativeio/file_descriptor.c
${SRC}/io/nativeio/SharedFileDescriptorFactory.c ${SRC}/io/nativeio/SharedFileDescriptorFactory.c

View File

@ -24,6 +24,7 @@
#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@" #cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@" #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
#cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@" #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE #cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE #cmakedefine HAVE_POSIX_FADVISE

View File

@ -100,6 +100,48 @@ public static class POSIX {
write. */ write. */
public static int SYNC_FILE_RANGE_WAIT_AFTER = 4; public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
/**
* Keeps the support state of PMDK.
*/
public enum SupportState {
UNSUPPORTED(-1),
PMDK_LIB_NOT_FOUND(1),
SUPPORTED(0);
private byte stateCode;
SupportState(int stateCode) {
this.stateCode = (byte) stateCode;
}
public int getStateCode() {
return stateCode;
}
public String getMessage() {
String msg;
switch (stateCode) {
case -1:
msg = "The native code is built without PMDK support.";
break;
case 1:
msg = "The native code is built with PMDK support, but PMDK libs " +
"are NOT found in execution environment or failed to be loaded.";
break;
case 0:
msg = "The native code is built with PMDK support, and PMDK libs " +
"are loaded successfully.";
break;
default:
msg = "The state code: " + stateCode + " is unrecognized!";
}
return msg;
}
}
// Denotes the state of supporting PMDK. The value is set by JNI.
private static SupportState pmdkSupportState =
SupportState.PMDK_LIB_NOT_FOUND;
private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class); private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
// Set to true via JNI if possible // Set to true via JNI if possible
@ -124,6 +166,93 @@ public static void setCacheManipulator(CacheManipulator cacheManipulator) {
POSIX.cacheManipulator = cacheManipulator; POSIX.cacheManipulator = cacheManipulator;
} }
// This method is invoked by JNI.
public static void setPmdkSupportState(int stateCode) {
for (SupportState state : SupportState.values()) {
if (state.getStateCode() == stateCode) {
pmdkSupportState = state;
return;
}
}
LOG.error("The state code: " + stateCode + " is unrecognized!");
}
public static boolean isPmdkAvailable() {
LOG.info(pmdkSupportState.getMessage());
return pmdkSupportState == SupportState.SUPPORTED;
}
/**
* Denote memory region for a file mapped.
*/
public static class PmemMappedRegion {
private long address;
private long length;
private boolean isPmem;
public PmemMappedRegion(long address, long length, boolean isPmem) {
this.address = address;
this.length = length;
this.isPmem = isPmem;
}
public boolean isPmem() {
return this.isPmem;
}
public long getAddress() {
return this.address;
}
public long getLength() {
return this.length;
}
}
/**
* JNI wrapper of persist memory operations.
*/
public static class Pmem {
// check whether the address is a Pmem address or DIMM address
public static boolean isPmem(long address, long length) {
return NativeIO.POSIX.isPmemCheck(address, length);
}
// create a pmem file and memory map it
public static PmemMappedRegion mapBlock(String path, long length) {
return NativeIO.POSIX.pmemCreateMapFile(path, length);
}
// unmap a pmem file
public static boolean unmapBlock(long address, long length) {
return NativeIO.POSIX.pmemUnMap(address, length);
}
// copy data from disk file(src) to pmem file(dest), without flush
public static void memCopy(byte[] src, long dest, boolean isPmem,
long length) {
NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
}
// flush the memory content to persistent storage
public static void memSync(PmemMappedRegion region) {
if (region.isPmem()) {
NativeIO.POSIX.pmemDrain();
} else {
NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
}
}
}
private static native boolean isPmemCheck(long address, long length);
private static native PmemMappedRegion pmemCreateMapFile(String path,
long length);
private static native boolean pmemUnMap(long address, long length);
private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
long length);
private static native void pmemDrain();
private static native void pmemSync(long address, long length);
/** /**
* Used to manipulate the operating system cache. * Used to manipulate the operating system cache.
*/ */
@ -143,8 +272,8 @@ public long getOperatingSystemPageSize() {
} }
public void posixFadviseIfPossible(String identifier, public void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags) FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException { throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset, NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags); len, flags);
} }
@ -748,7 +877,7 @@ public CachedUid(String username, long timestamp) {
* user account name, of the format DOMAIN\UserName. This method * user account name, of the format DOMAIN\UserName. This method
* will remove the domain part of the full logon name. * will remove the domain part of the full logon name.
* *
* @param Fthe full principal name containing the domain * @param name the full principal name containing the domain
* @return name with domain removed * @return name with domain removed
*/ */
private static String stripDomain(String name) { private static String stripDomain(String name) {

View File

@ -36,6 +36,10 @@
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#ifdef HADOOP_PMDK_LIBRARY
#include <libpmem.h>
#include "pmdk_load.h"
#endif
#if !(defined(__FreeBSD__) || defined(__MACH__)) #if !(defined(__FreeBSD__) || defined(__MACH__))
#include <sys/sendfile.h> #include <sys/sendfile.h>
#endif #endif
@ -60,6 +64,7 @@
#define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX" #define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX"
#define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat" #define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat"
#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion"
#define SET_INT_OR_RETURN(E, C, F) \ #define SET_INT_OR_RETURN(E, C, F) \
{ \ { \
@ -81,6 +86,12 @@ static jmethodID nioe_ctor;
// Please see HADOOP-7156 for details. // Please see HADOOP-7156 for details.
jobject pw_lock_object; jobject pw_lock_object;
#ifdef HADOOP_PMDK_LIBRARY
// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor
static jclass pmem_region_clazz = NULL;
static jmethodID pmem_region_ctor = NULL;
#endif
/* /*
* Throw a java.IO.IOException, generating the message from errno. * Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c * NB. this is also used form windows_secure_container_executor.c
@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) {
nioe_ctor = NULL; nioe_ctor = NULL;
} }
#ifdef HADOOP_PMDK_LIBRARY
static int loadPmdkLib(JNIEnv *env) {
char errMsg[1024];
jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS);
if (clazz == NULL) {
return 0; // exception has been raised
}
load_pmdk_lib(errMsg, sizeof(errMsg));
jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V");
if (mid == 0) {
return 0;
}
if (strlen(errMsg) > 0) {
(*env)->CallStaticVoidMethod(env, clazz, mid, 1);
return 0;
}
(*env)->CallStaticVoidMethod(env, clazz, mid, 0);
return 1;
}
static void pmem_region_init(JNIEnv *env, jclass nativeio_class) {
jclass clazz = NULL;
// Init Stat
clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS);
if (!clazz) {
THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class");
return; // exception has been raised
}
// Init PmemMappedRegion class
pmem_region_clazz = (*env)->NewGlobalRef(env, clazz);
if (!pmem_region_clazz) {
THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class");
return; // exception has been raised
}
pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "<init>", "(JJZ)V");
if (!pmem_region_ctor) {
THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor");
return; // exception has been raised
}
}
static void pmem_region_deinit(JNIEnv *env) {
if (pmem_region_ctor != NULL) {
(*env)->DeleteGlobalRef(env, pmem_region_ctor);
pmem_region_ctor = NULL;
}
if (pmem_region_clazz != NULL) {
(*env)->DeleteGlobalRef(env, pmem_region_clazz);
pmem_region_clazz = NULL;
}
}
#endif
/* /*
* private static native void initNative(); * private static native void initNative();
* *
@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
#ifdef UNIX #ifdef UNIX
errno_enum_init(env); errno_enum_init(env);
PASS_EXCEPTIONS_GOTO(env, error); PASS_EXCEPTIONS_GOTO(env, error);
#ifdef HADOOP_PMDK_LIBRARY
if (loadPmdkLib(env)) {
pmem_region_init(env, clazz);
}
#endif
#endif #endif
return; return;
error: error:
@ -299,6 +372,9 @@ error:
// class wasn't initted yet // class wasn't initted yet
#ifdef UNIX #ifdef UNIX
stat_deinit(env); stat_deinit(env);
#ifdef HADOOP_PMDK_LIBRARY
pmem_region_deinit(env);
#endif
#endif #endif
nioe_deinit(env); nioe_deinit(env);
fd_deinit(env); fd_deinit(env);
@ -1383,3 +1459,179 @@ cleanup:
/** /**
* vim: sw=2: ts=2: et: * vim: sw=2: ts=2: et:
*/ */
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: isPmemCheck
* Signature: (JJ)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck(
JNIEnv *env, jclass thisClass, jlong address, jlong length) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
jint is_pmem = pmdkLoader->pmem_is_pmem(address, length);
return (is_pmem) ? JNI_TRUE : JNI_FALSE;
#else
THROW(env, "java/lang/UnsupportedOperationException",
"The function isPmemCheck is not supported.");
return JNI_FALSE;
#endif
}
/*
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: pmemCreateMapFile
* Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
*/
JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
/* create a pmem file and memory map it */
const char * path = NULL;
void * pmemaddr = NULL;
size_t mapped_len = 0;
int is_pmem = 1;
char msg[1000];
path = (*env)->GetStringUTFChars(env, filePath, NULL);
if (!path) {
THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null");
return NULL;
}
if (fileLength <= 0) {
(*env)->ReleaseStringUTFChars(env, filePath, path);
THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
return NULL;
}
pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
0666, &mapped_len, &is_pmem);
if (!pmemaddr) {
snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
THROW(env, "java/io/IOException", msg);
(*env)->ReleaseStringUTFChars(env, filePath, path);
return NULL;
}
if (fileLength != mapped_len) {
snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg());
THROW(env, "java/io/IOException", msg);
(*env)->ReleaseStringUTFChars(env, filePath, path);
return NULL;
}
(*env)->ReleaseStringUTFChars(env, filePath, path);
if ((!pmem_region_clazz) || (!pmem_region_ctor)) {
THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL");
return NULL;
}
jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem);
return ret;
#else
THROW(env, "java/lang/UnsupportedOperationException",
"The function pmemCreateMapFile is not supported.");
return NULL;
#endif
}
/*
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: pmemUnMap
* Signature: (JJ)V
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap(
JNIEnv *env, jclass thisClass, jlong address, jlong length) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
int succeed = 0;
char msg[1000];
succeed = pmdkLoader->pmem_unmap(address, length);
// succeed = -1 failure; succeed = 0 success
if (succeed != 0) {
snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
THROW(env, "java/io/IOException", msg);
return JNI_FALSE;
} else {
return JNI_TRUE;
}
#else
THROW(env, "java/lang/UnsupportedOperationException",
"The function pmemUnMap is not supported.");
return JNI_FALSE;
#endif
}
/*
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: pmemCopy
* Signature: ([BJJ)V
*/
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy(
JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
char msg[1000];
jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0);
snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf);
if (is_pmem) {
pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length);
} else {
memcpy(address, srcBuf, length);
}
(*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0);
return;
#else
THROW(env, "java/lang/UnsupportedOperationException",
"The function pmemCopy is not supported.");
#endif
}
/*
* Class: org_apache_hadoop_io_nativeio_NativeIO
* Method: pmemDrain
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain(
JNIEnv *env, jclass thisClass) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
pmdkLoader->pmem_drain();
#else
THROW(env, "java/lang/UnsupportedOperationException",
"The function pmemDrain is not supported.");
#endif
}
/*
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: pmemSync
* Signature: (JJ)V
*/
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync
(JNIEnv * env, jclass thisClass, jlong address, jlong length) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
int succeed = 0;
char msg[1000];
succeed = pmdkLoader->pmem_msync(address, length);
// succeed = -1 failure
if (succeed = -1) {
snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
THROW(env, "java/io/IOException", msg);
return;
}
#else
THROW(env, "java/lang/UnsupportedOperationException",
"The function pmemSync is not supported.");
#endif
}
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "org_apache_hadoop.h"
#include "pmdk_load.h"
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
#ifdef UNIX
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <dlfcn.h>
#include "config.h"
#endif
PmdkLibLoader * pmdkLoader;
/**
* pmdk_load.c
* Utility of loading the libpmem library and the required functions.
* Building of this codes won't rely on any libpmem source codes, but running
* into this will rely on successfully loading of the dynamic library.
*
*/
static const char* load_functions() {
#ifdef UNIX
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file");
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap");
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem");
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain");
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain");
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync");
#endif
return NULL;
}
void load_pmdk_lib(char* err, size_t err_len) {
const char* errMsg;
const char* library = NULL;
#ifdef UNIX
Dl_info dl_info;
#else
LPTSTR filename = NULL;
#endif
err[0] = '\0';
if (pmdkLoader != NULL) {
return;
}
pmdkLoader = calloc(1, sizeof(PmdkLibLoader));
// Load PMDK library
#ifdef UNIX
pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (pmdkLoader->libec == NULL) {
snprintf(err, err_len, "Failed to load %s (%s)",
HADOOP_PMDK_LIBRARY, dlerror());
return;
}
// Clear any existing error
dlerror();
#endif
errMsg = load_functions(pmdkLoader->libec);
if (errMsg != NULL) {
snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
}
#ifdef UNIX
if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
library = dl_info.dli_fname;
}
#else
if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
library = filename;
}
#endif
if (library == NULL) {
library = HADOOP_PMDK_LIBRARY;
}
pmdkLoader->libname = strdup(library);
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "org_apache_hadoop.h"
#ifdef UNIX
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <dlfcn.h>
#endif
#ifndef _PMDK_LOAD_H_
#define _PMDK_LOAD_H_
#ifdef UNIX
// For libpmem.h
typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode,
size_t *mapped_lenp, int *is_pmemp);
typedef int (* __d_pmem_unmap)(void *addr, size_t len);
typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len);
typedef void (*__d_pmem_drain)(void);
typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len);
typedef int (* __d_pmem_msync)(const void *addr, size_t len);
#endif
typedef struct __PmdkLibLoader {
// The loaded library handle
void* libec;
char* libname;
__d_pmem_map_file pmem_map_file;
__d_pmem_unmap pmem_unmap;
__d_pmem_is_pmem pmem_is_pmem;
__d_pmem_drain pmem_drain;
__d_pmem_memcpy_nodrain pmem_memcpy_nodrain;
__d_pmem_msync pmem_msync;
} PmdkLibLoader;
extern PmdkLibLoader * pmdkLoader;
/**
* A helper function to dlsym a 'symbol' from a given library-handle.
*/
#ifdef UNIX
static __attribute__ ((unused))
void *myDlsym(void *handle, const char *symbol) {
void *func_ptr = dlsym(handle, symbol);
return func_ptr;
}
/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */
#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \
if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \
return "Failed to load symbol" symbol; \
}
#endif
/**
* Return 0 if not support, 1 otherwise.
*/
int build_support_pmdk();
/**
* Initialize and load PMDK library, returning error message if any.
*
* @param err The err message buffer.
* @param err_len The length of the message buffer.
*/
void load_pmdk_lib(char* err, size_t err_len);
#endif //_PMDK_LOAD_H_

View File

@ -25,6 +25,8 @@
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode; import java.nio.channels.FileChannel.MapMode;
@ -782,4 +784,155 @@ public void testNativeFadviseConsts() {
assertTrue("Native POSIX_FADV_NOREUSE const not set", assertTrue("Native POSIX_FADV_NOREUSE const not set",
POSIX_FADV_NOREUSE >= 0); POSIX_FADV_NOREUSE >= 0);
} }
@Test (timeout=10000)
public void testPmemCheckParameters() {
assumeNotWindows("Native PMDK not supported on Windows");
// Skip testing while the build or environment does not support PMDK
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
// Please make sure /mnt/pmem0 is a persistent memory device with total
// volume size 'volumeSize'
String filePath = "/$:";
long length = 0;
long volumnSize = 16 * 1024 * 1024 * 1024L;
// Incorrect file length
try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
fail("Illegal length parameter should be detected");
} catch (Exception e) {
LOG.info(e.getMessage());
}
// Incorrect file length
filePath = "/mnt/pmem0/test_native_io";
length = -1L;
try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
fail("Illegal length parameter should be detected");
}catch (Exception e) {
LOG.info(e.getMessage());
}
}
@Test (timeout=10000)
public void testPmemMapMultipleFiles() {
assumeNotWindows("Native PMDK not supported on Windows");
// Skip testing while the build or environment does not support PMDK
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
// Please make sure /mnt/pmem0 is a persistent memory device with total
// volume size 'volumeSize'
String filePath = "/mnt/pmem0/test_native_io";
long length = 0;
long volumnSize = 16 * 1024 * 1024 * 1024L;
// Multiple files, each with 128MB size, aggregated size exceeds volume
// limit 16GB
length = 128 * 1024 * 1024L;
long fileNumber = volumnSize / length;
LOG.info("File number = " + fileNumber);
for (int i = 0; i < fileNumber; i++) {
String path = filePath + i;
LOG.info("File path = " + path);
NativeIO.POSIX.Pmem.mapBlock(path, length);
}
try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
fail("Request map extra file when persistent memory is all occupied");
} catch (Exception e) {
LOG.info(e.getMessage());
}
}
@Test (timeout=10000)
public void testPmemMapBigFile() {
assumeNotWindows("Native PMDK not supported on Windows");
// Skip testing while the build or environment does not support PMDK
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
// Please make sure /mnt/pmem0 is a persistent memory device with total
// volume size 'volumeSize'
String filePath = "/mnt/pmem0/test_native_io_big";
long length = 0;
long volumeSize = 16 * 1024 * 1024 * 1024L;
// One file length exceeds persistent memory volume 16GB.
length = volumeSize + 1024L;
try {
LOG.info("File length = " + length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
fail("File length exceeds persistent memory total volume size");
}catch (Exception e) {
LOG.info(e.getMessage());
deletePmemMappedFile(filePath);
}
}
@Test (timeout=10000)
public void testPmemCopy() throws IOException {
assumeNotWindows("Native PMDK not supported on Windows");
// Skip testing while the build or environment does not support PMDK
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
// Create and map a block file. Please make sure /mnt/pmem0 is a persistent
// memory device.
String filePath = "/mnt/pmem0/copy";
long length = 4096;
PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length));
// Copy content to mapped file
byte[] data = generateSequentialBytes(0, (int) length);
NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(),
length);
// Read content before pmemSync
byte[] readBuf1 = new byte[(int)length];
IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length);
assertArrayEquals(data, readBuf1);
byte[] readBuf2 = new byte[(int)length];
// Sync content to persistent memory twice
NativeIO.POSIX.Pmem.memSync(region);
NativeIO.POSIX.Pmem.memSync(region);
// Read content after pmemSync twice
IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length);
assertArrayEquals(data, readBuf2);
//Read content after unmap twice
NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
byte[] readBuf3 = new byte[(int)length];
IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length);
assertArrayEquals(data, readBuf3);
}
private static byte[] generateSequentialBytes(int start, int length) {
byte[] result = new byte[length];
for (int i = 0; i < length; i++) {
result[i] = (byte) ((start + i) % 127);
}
return result;
}
private static void deletePmemMappedFile(String filePath) {
try {
if (filePath != null) {
boolean result = Files.deleteIfExists(Paths.get(filePath));
if (!result) {
throw new IOException();
}
}
} catch (Throwable e) {
LOG.error("Failed to delete the mapped file " + filePath +
" from persistent memory", e);
}
}
} }

View File

@ -214,6 +214,28 @@ String getReplicaCachePath(String bpid, long blockId) {
return PmemVolumeManager.getInstance().getCachePath(key); return PmemVolumeManager.getInstance().getCachePath(key);
} }
/**
* Get cache address on persistent memory for read operation.
* The cache address comes from PMDK lib function when mapping
* block to persistent memory.
*
* @param bpid blockPoolId
* @param blockId blockId
* @return address
*/
long getCacheAddress(String bpid, long blockId) {
if (cacheLoader.isTransientCache() ||
!isCached(bpid, blockId)) {
return -1;
}
if (!(cacheLoader.isNativeLoader())) {
return -1;
}
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock;
return mappableBlock.getAddress();
}
/** /**
* @return List of cached blocks suitable for translation into a * @return List of cached blocks suitable for translation into a
* {@link BlockListAsLongs} for a cache report. * {@link BlockListAsLongs} for a cache report.

View File

@ -803,6 +803,14 @@ private InputStream getBlockInputStreamWithCheckingPmemCache(
String cachePath = cacheManager.getReplicaCachePath( String cachePath = cacheManager.getReplicaCachePath(
b.getBlockPoolId(), b.getBlockId()); b.getBlockPoolId(), b.getBlockId());
if (cachePath != null) { if (cachePath != null) {
long addr = cacheManager.getCacheAddress(
b.getBlockPoolId(), b.getBlockId());
if (addr != -1) {
LOG.debug("Get InputStream by cache address.");
return FsDatasetUtil.getDirectInputStream(
addr, info.getBlockDataLength());
}
LOG.debug("Get InputStream by cache file path.");
return FsDatasetUtil.getInputStreamAndSeek( return FsDatasetUtil.getInputStreamAndSeek(
new File(cachePath), seekOffset); new File(cachePath), seekOffset);
} }

View File

@ -25,7 +25,10 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -42,6 +45,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
/** Utility methods. */ /** Utility methods. */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -131,6 +135,24 @@ public static InputStream getInputStreamAndSeek(File file, long offset)
} }
} }
public static InputStream getDirectInputStream(long addr, long length)
throws IOException {
try {
Class<?> directByteBufferClass =
Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor =
directByteBufferClass.getDeclaredConstructor(long.class, int.class);
constructor.setAccessible(true);
ByteBuffer byteBuffer =
(ByteBuffer) constructor.newInstance(addr, (int)length);
return new ByteBufferBackedInputStream(byteBuffer);
} catch (ClassNotFoundException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException |
InstantiationException e) {
throw new IOException(e);
}
}
/** /**
* Find the meta-file for the specified block file and then return the * Find the meta-file for the specified block file and then return the
* generation stamp from the name of the meta-file. Generally meta file will * generation stamp from the name of the meta-file. Generally meta file will

View File

@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable {
* @return the number of bytes that have been cached. * @return the number of bytes that have been cached.
*/ */
long getLength(); long getLength();
/**
* Get cache address if applicable.
* Return -1 if not applicable.
*/
long getAddress();
} }

View File

@ -64,8 +64,7 @@ public abstract class MappableBlockLoader {
* @return The Mappable block. * @return The Mappable block.
*/ */
abstract MappableBlock load(long length, FileInputStream blockIn, abstract MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName, FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
ExtendedBlockId key)
throws IOException; throws IOException;
/** /**
@ -106,6 +105,11 @@ abstract MappableBlock load(long length, FileInputStream blockIn,
*/ */
abstract boolean isTransientCache(); abstract boolean isTransientCache();
/**
* Check whether this is a native pmem cache loader.
*/
abstract boolean isNativeLoader();
/** /**
* Clean up cache, can be used during DataNode shutdown. * Clean up cache, can be used during DataNode shutdown.
*/ */
@ -117,8 +121,7 @@ void shutdown() {
* Verifies the block's checksum. This is an I/O intensive operation. * Verifies the block's checksum. This is an I/O intensive operation.
*/ */
protected void verifyChecksum(long length, FileInputStream metaIn, protected void verifyChecksum(long length, FileInputStream metaIn,
FileChannel blockChannel, String blockFileName) FileChannel blockChannel, String blockFileName) throws IOException {
throws IOException {
// Verify the checksum from the block's meta file // Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header // Get the DataChecksum from the meta file header
BlockMetadataHeader header = BlockMetadataHeader header =

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.io.nativeio.NativeIO;
/** /**
* Creates MappableBlockLoader. * Creates MappableBlockLoader.
@ -42,6 +43,9 @@ public static MappableBlockLoader createCacheLoader(DNConf conf) {
if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) { if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
return new MemoryMappableBlockLoader(); return new MemoryMappableBlockLoader();
} }
if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) {
return new NativePmemMappableBlockLoader();
}
return new PmemMappableBlockLoader(); return new PmemMappableBlockLoader();
} }
} }

View File

@ -66,8 +66,7 @@ void initialize(FsDatasetCache cacheManager) throws IOException {
*/ */
@Override @Override
MappableBlock load(long length, FileInputStream blockIn, MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName, FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
ExtendedBlockId key)
throws IOException { throws IOException {
MemoryMappedBlock mappableBlock = null; MemoryMappedBlock mappableBlock = null;
MappedByteBuffer mmap = null; MappedByteBuffer mmap = null;
@ -116,4 +115,9 @@ long release(ExtendedBlockId key, long bytesCount) {
public boolean isTransientCache() { public boolean isTransientCache() {
return true; return true;
} }
@Override
public boolean isNativeLoader() {
return false;
}
} }

View File

@ -44,6 +44,11 @@ public long getLength() {
return length; return length;
} }
@Override
public long getAddress() {
return -1L;
}
@Override @Override
public void close() { public void close() {
if (mmap != null) { if (mmap != null) {

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* Map block to persistent memory with native PMDK libs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
private static final Logger LOG =
LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
super.initialize(cacheManager);
}
/**
* Load the block.
*
* Map the block and verify its checksum.
*
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
* is a persistent memory volume chosen by PmemVolumeManager.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
* start. The caller must close this.
* @param metaIn The meta file input stream. Should be positioned at
* the start. The caller must close this.
* @param blockFileName The block file name, for logging purposes.
* @param key The extended block ID.
*
* @throws IOException If mapping block to persistent memory fails or
* checksum fails.
*
* @return The Mappable block.
*/
@Override
public MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName,
ExtendedBlockId key)
throws IOException {
NativePmemMappedBlock mappableBlock = null;
POSIX.PmemMappedRegion region = null;
String filePath = null;
FileChannel blockChannel = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
assert NativeIO.isAvailable();
filePath = PmemVolumeManager.getInstance().getCachePath(key);
region = POSIX.Pmem.mapBlock(filePath, length);
if (region == null) {
throw new IOException("Failed to map the block " + blockFileName +
" to persistent storage.");
}
verifyChecksumAndMapBlock(region, length, metaIn, blockChannel,
blockFileName);
mappableBlock = new NativePmemMappedBlock(region.getAddress(),
region.getLength(), key);
LOG.info("Successfully cached one replica:{} into persistent memory"
+ ", [cached path={}, address={}, length={}]", key, filePath,
region.getAddress(), length);
} finally {
IOUtils.closeQuietly(blockChannel);
if (mappableBlock == null) {
if (region != null) {
// unmap content from persistent memory
POSIX.Pmem.unmapBlock(region.getAddress(),
region.getLength());
FsDatasetUtil.deleteMappedFile(filePath);
}
}
}
return mappableBlock;
}
/**
* Verifies the block's checksum meanwhile map block to persistent memory.
* This is an I/O intensive operation.
*/
private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
long length, FileInputStream metaIn, FileChannel blockChannel,
String blockFileName) throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize())));
FileChannel metaChannel = null;
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
throw new IOException("Cannot get FileChannel" +
" from Block InputStream meta file.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
long mappedAddress = -1L;
if (region != null) {
mappedAddress = region.getAddress();
}
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF.");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException(
"Checksum verification failed for the block " + blockFileName +
": premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.limit(chunks * checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// Success
bytesVerified += bytesRead;
// Copy data to persistent file
POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress,
region.isPmem(), bytesRead);
mappedAddress += bytesRead;
// Clear buffer
blockBuf.clear();
checksumBuf.clear();
}
if (region != null) {
POSIX.Pmem.memSync(region);
}
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
@Override
public boolean isNativeLoader() {
return true;
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Represents an HDFS block that is mapped to persistent memory by the DataNode.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NativePmemMappedBlock implements MappableBlock {
private static final Logger LOG =
LoggerFactory.getLogger(NativePmemMappedBlock.class);
private long pmemMappedAddress = -1L;
private long length;
private ExtendedBlockId key;
NativePmemMappedBlock(long pmemMappedAddress, long length,
ExtendedBlockId key) {
assert length > 0;
this.pmemMappedAddress = pmemMappedAddress;
this.length = length;
this.key = key;
}
@Override
public long getLength() {
return length;
}
@Override
public long getAddress() {
return pmemMappedAddress;
}
@Override
public void close() {
if (pmemMappedAddress != -1L) {
String cacheFilePath =
PmemVolumeManager.getInstance().getCachePath(key);
try {
// Current libpmem will report error when pmem_unmap is called with
// length not aligned with page size, although the length is returned
// by pmem_map_file.
boolean success =
NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length);
if (!success) {
throw new IOException("Failed to unmap the mapped file from " +
"pmem address: " + pmemMappedAddress);
}
pmemMappedAddress = -1L;
FsDatasetUtil.deleteMappedFile(cacheFilePath);
LOG.info("Successfully uncached one replica:{} from persistent memory"
+ ", [cached path={}, length={}]", key, cacheFilePath, length);
} catch (IOException e) {
LOG.warn("IOException occurred for block {}!", key, e);
}
}
}
}

View File

@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
@Override @Override
void initialize(FsDatasetCache cacheManager) throws IOException { void initialize(FsDatasetCache cacheManager) throws IOException {
LOG.info("Initializing cache loader: PmemMappableBlockLoader."); LOG.info("Initializing cache loader: " + this.getClass().getName());
DNConf dnConf = cacheManager.getDnConf(); DNConf dnConf = cacheManager.getDnConf();
PmemVolumeManager.init(dnConf.getPmemVolumes()); PmemVolumeManager.init(dnConf.getPmemVolumes());
pmemVolumeManager = PmemVolumeManager.getInstance(); pmemVolumeManager = PmemVolumeManager.getInstance();
@ -71,8 +71,7 @@ void initialize(FsDatasetCache cacheManager) throws IOException {
*/ */
@Override @Override
MappableBlock load(long length, FileInputStream blockIn, MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName, FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
ExtendedBlockId key)
throws IOException { throws IOException {
PmemMappedBlock mappableBlock = null; PmemMappedBlock mappableBlock = null;
String cachePath = null; String cachePath = null;
@ -132,6 +131,11 @@ public boolean isTransientCache() {
return false; return false;
} }
@Override
public boolean isNativeLoader() {
return false;
}
@Override @Override
void shutdown() { void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown."); LOG.info("Clean up cache on persistent memory during shutdown.");

View File

@ -49,6 +49,11 @@ public long getLength() {
return length; return length;
} }
@Override
public long getAddress() {
return -1L;
}
@Override @Override
public void close() { public void close() {
String cacheFilePath = String cacheFilePath =