HDFS-7291. Persist in-memory replicas with appropriate unbuffered copy API on POSIX and Windows. Contributed by Xiaoyu Yao.
This commit is contained in:
parent
69f79bee8b
commit
c6f04f391b
@ -22,22 +22,20 @@
|
|||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.MappedByteBuffer;
|
import java.nio.MappedByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.HardLink;
|
import org.apache.hadoop.fs.HardLink;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
|
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
@ -662,7 +660,7 @@ public CachedUid(String username, long timestamp) {
|
|||||||
* user account name, of the format DOMAIN\UserName. This method
|
* user account name, of the format DOMAIN\UserName. This method
|
||||||
* will remove the domain part of the full logon name.
|
* will remove the domain part of the full logon name.
|
||||||
*
|
*
|
||||||
* @param the full principal name containing the domain
|
* @param Fthe full principal name containing the domain
|
||||||
* @return name with domain removed
|
* @return name with domain removed
|
||||||
*/
|
*/
|
||||||
private static String stripDomain(String name) {
|
private static String stripDomain(String name) {
|
||||||
@ -855,24 +853,66 @@ private static native void link0(String src, String dst)
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Unbuffered file copy from src to dst without tainting OS buffer cache
|
* Unbuffered file copy from src to dst without tainting OS buffer cache
|
||||||
* In Linux, it uses sendfile() which uses O_DIRECT flag internally
|
|
||||||
* In Windows, it uses CopyFileEx with COPY_FILE_NO_BUFFERING flag
|
|
||||||
*
|
*
|
||||||
* Note: This does not support FreeBSD/OSX which have a different sendfile()
|
* In POSIX platform:
|
||||||
* semantic. Also, this simple native wrapper does minimal parameter checking
|
* It uses FileChannel#transferTo() which internally attempts
|
||||||
|
* unbuffered IO on OS with native sendfile64() support and falls back to
|
||||||
|
* buffered IO otherwise.
|
||||||
|
*
|
||||||
|
* It minimizes the number of FileChannel#transferTo call by passing the the
|
||||||
|
* src file size directly instead of a smaller size as the 3rd parameter.
|
||||||
|
* This saves the number of sendfile64() system call when native sendfile64()
|
||||||
|
* is supported. In the two fall back cases where sendfile is not supported,
|
||||||
|
* FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
|
||||||
|
* respectively.
|
||||||
|
*
|
||||||
|
* In Windows Platform:
|
||||||
|
* It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
|
||||||
|
* flag, which is supported on Windows Server 2008 and above.
|
||||||
|
*
|
||||||
|
* Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
|
||||||
|
* platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
|
||||||
|
* used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
|
||||||
|
* Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
|
||||||
|
* on Windows simply returns IOS_UNSUPPORTED.
|
||||||
|
*
|
||||||
|
* Note: This simple native wrapper does minimal parameter checking before copy and
|
||||||
|
* consistency check (e.g., size) after copy.
|
||||||
* It is recommended to use wrapper function like
|
* It is recommended to use wrapper function like
|
||||||
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs.
|
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy
|
||||||
*
|
* checks.
|
||||||
*
|
*
|
||||||
* @param src The source path
|
* @param src The source path
|
||||||
* @param dst The destination path
|
* @param dst The destination path
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void copyFileUnbuffered(File src, File dst) throws IOException {
|
public static void copyFileUnbuffered(File src, File dst) throws IOException {
|
||||||
if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
|
if (nativeLoaded && Shell.WINDOWS) {
|
||||||
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
|
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||||
} else {
|
} else {
|
||||||
FileUtils.copyFile(src, dst);
|
FileInputStream fis = null;
|
||||||
|
FileOutputStream fos = null;
|
||||||
|
FileChannel input = null;
|
||||||
|
FileChannel output = null;
|
||||||
|
try {
|
||||||
|
fis = new FileInputStream(src);
|
||||||
|
fos = new FileOutputStream(dst);
|
||||||
|
input = fis.getChannel();
|
||||||
|
output = fos.getChannel();
|
||||||
|
long remaining = input.size();
|
||||||
|
long position = 0;
|
||||||
|
long transferred = 0;
|
||||||
|
while (remaining > 0) {
|
||||||
|
transferred = input.transferTo(position, remaining, output);
|
||||||
|
remaining -= transferred;
|
||||||
|
position += transferred;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, output);
|
||||||
|
IOUtils.cleanup(LOG, fos);
|
||||||
|
IOUtils.cleanup(LOG, input);
|
||||||
|
IOUtils.cleanup(LOG, fis);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1155,46 +1155,8 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_copyFileUnbuffered0(
|
|||||||
JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst)
|
JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst)
|
||||||
{
|
{
|
||||||
#ifdef UNIX
|
#ifdef UNIX
|
||||||
#if (defined(__FreeBSD__) || defined(__MACH__))
|
THROW(env, "java/lang/UnsupportedOperationException",
|
||||||
THROW(env, "java/io/IOException",
|
"The function copyFileUnbuffered0 should not be used on Unix. Use FileChannel#transferTo instead.");
|
||||||
"The function copyFileUnbuffered() is not supported on FreeBSD or Mac OS");
|
|
||||||
return;
|
|
||||||
#else
|
|
||||||
const char *src = NULL, *dst = NULL;
|
|
||||||
int srcFd = -1;
|
|
||||||
int dstFd = -1;
|
|
||||||
struct stat s;
|
|
||||||
off_t offset = 0;
|
|
||||||
|
|
||||||
src = (*env)->GetStringUTFChars(env, jsrc, NULL);
|
|
||||||
if (!src) goto cleanup; // exception was thrown
|
|
||||||
dst = (*env)->GetStringUTFChars(env, jdst, NULL);
|
|
||||||
if (!dst) goto cleanup; // exception was thrown
|
|
||||||
|
|
||||||
srcFd = open(src, O_RDONLY);
|
|
||||||
if (srcFd == -1) {
|
|
||||||
throw_ioe(env, errno);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
if (fstat(srcFd, &s) == -1){
|
|
||||||
throw_ioe(env, errno);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
dstFd = open(dst, O_WRONLY | O_CREAT, s.st_mode);
|
|
||||||
if (dstFd == -1) {
|
|
||||||
throw_ioe(env, errno);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
if (sendfile(dstFd, srcFd, &offset, s.st_size) == -1) {
|
|
||||||
throw_ioe(env, errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
if (src) (*env)->ReleaseStringUTFChars(env, jsrc, src);
|
|
||||||
if (dst) (*env)->ReleaseStringUTFChars(env, jdst, dst);
|
|
||||||
if (srcFd != -1) close(srcFd);
|
|
||||||
if (dstFd != -1) close(dstFd);
|
|
||||||
#endif
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -632,7 +632,7 @@ public void testCopyFileUnbuffered() throws Exception {
|
|||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
File srcFile = new File(TEST_DIR, METHOD_NAME + ".src.dat");
|
File srcFile = new File(TEST_DIR, METHOD_NAME + ".src.dat");
|
||||||
File dstFile = new File(TEST_DIR, METHOD_NAME + ".dst.dat");
|
File dstFile = new File(TEST_DIR, METHOD_NAME + ".dst.dat");
|
||||||
final int fileSize = 0x8FFFFFF; // 128 MB
|
final int fileSize = 0x8000000; // 128 MB
|
||||||
final int SEED = 0xBEEF;
|
final int SEED = 0xBEEF;
|
||||||
final int batchSize = 4096;
|
final int batchSize = 4096;
|
||||||
final int numBatches = fileSize / batchSize;
|
final int numBatches = fileSize / batchSize;
|
||||||
@ -650,7 +650,8 @@ public void testCopyFileUnbuffered() throws Exception {
|
|||||||
mapBuf.put(bytesToWrite);
|
mapBuf.put(bytesToWrite);
|
||||||
}
|
}
|
||||||
NativeIO.copyFileUnbuffered(srcFile, dstFile);
|
NativeIO.copyFileUnbuffered(srcFile, dstFile);
|
||||||
}finally {
|
Assert.assertEquals(srcFile.length(), dstFile.length());
|
||||||
|
} finally {
|
||||||
IOUtils.cleanup(LOG, channel);
|
IOUtils.cleanup(LOG, channel);
|
||||||
IOUtils.cleanup(LOG, raSrcFile);
|
IOUtils.cleanup(LOG, raSrcFile);
|
||||||
FileUtils.deleteQuietly(TEST_DIR);
|
FileUtils.deleteQuietly(TEST_DIR);
|
||||||
|
@ -1301,6 +1301,9 @@ Release 2.6.0 - UNRELEASED
|
|||||||
HDFS-6934. Move checksum computation off the hot path when writing to RAM
|
HDFS-6934. Move checksum computation off the hot path when writing to RAM
|
||||||
disk. (cnauroth)
|
disk. (cnauroth)
|
||||||
|
|
||||||
|
HDFS-7291. Persist in-memory replicas with appropriate unbuffered copy API
|
||||||
|
on POSIX and Windows. (Xiaoyu Yao via cnauroth)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1004,10 +1004,11 @@ public static void rename(File from, File to) throws IOException {
|
|||||||
* This method copies the contents of the specified source file
|
* This method copies the contents of the specified source file
|
||||||
* to the specified destination file using OS specific unbuffered IO.
|
* to the specified destination file using OS specific unbuffered IO.
|
||||||
* The goal is to avoid churning the file system buffer cache when copying
|
* The goal is to avoid churning the file system buffer cache when copying
|
||||||
* large files. TheFileUtils#copyLarge function from apache-commons-io library
|
* large files.
|
||||||
* can be used to achieve this with an internal memory buffer but is less
|
*
|
||||||
* efficient than the native unbuffered APIs such as sendfile() in Linux and
|
* We can't use FileUtils#copyFile from apache-commons-io because it
|
||||||
* CopyFileEx() in Windows wrapped in {@link NativeIO#copyFileUnbuffered}.
|
* is a buffered IO based on FileChannel#transferFrom, which uses MmapByteBuffer
|
||||||
|
* internally.
|
||||||
*
|
*
|
||||||
* The directory holding the destination file is created if it does not exist.
|
* The directory holding the destination file is created if it does not exist.
|
||||||
* If the destination file exists, then this method will delete it first.
|
* If the destination file exists, then this method will delete it first.
|
||||||
|
Loading…
Reference in New Issue
Block a user