HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add ReadaheadPool infrastructure for use in HDFS and MR. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-10-27 22:19:13 +00:00
parent 646e855f6e
commit 78336e717b
7 changed files with 474 additions and 2 deletions

View File

@ -511,6 +511,9 @@ Release 0.23.0 - Unreleased
HADOOP-7445. Implement bulk checksum verification using efficient native HADOOP-7445. Implement bulk checksum verification using efficient native
code. (todd) code. (todd)
HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add
ReadaheadPool infrastructure for use in HDFS and MR. (todd)
BUG FIXES BUG FIXES
HADOOP-7630. hadoop-metrics2.properties should have a property *.period HADOOP-7630. hadoop-metrics2.properties should have a property *.period

View File

@ -0,0 +1,242 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.nativeio.NativeIO;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Manages a pool of threads which can issue readahead requests on file descriptors.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReadaheadPool {
static final Log LOG = LogFactory.getLog(ReadaheadPool.class);
private static final int POOL_SIZE = 4;
private static final int MAX_POOL_SIZE = 16;
private static final int CAPACITY = 1024;
private final ThreadPoolExecutor pool;
private static ReadaheadPool instance;
/**
* Return the singleton instance for the current process.
*/
public static ReadaheadPool getInstance() {
synchronized (ReadaheadPool.class) {
if (instance == null && NativeIO.isAvailable()) {
instance = new ReadaheadPool();
}
return instance;
}
}
private ReadaheadPool() {
pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
pool.setThreadFactory(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Readahead Thread #%d")
.build());
}
/**
* Issue a request to readahead on the given file descriptor.
*
* @param identifier a textual identifier that will be used in error
* messages (e.g. the file name)
* @param fd the file descriptor to read ahead
* @param curPos the current offset at which reads are being issued
* @param readaheadLength the configured length to read ahead
* @param maxOffsetToRead the maximum offset that will be readahead
* (useful if, for example, only some segment of the file is
* requested by the user). Pass {@link Long.MAX_VALUE} to allow
* readahead to the end of the file.
* @param lastReadahead the result returned by the previous invocation
* of this function on this file descriptor, or null if this is
* the first call
* @return an object representing this outstanding request, or null
* if no readahead was performed
*/
public ReadaheadRequest readaheadStream(
String identifier,
FileDescriptor fd,
long curPos,
long readaheadLength,
long maxOffsetToRead,
ReadaheadRequest lastReadahead) {
Preconditions.checkArgument(curPos <= maxOffsetToRead,
"Readahead position %s higher than maxOffsetToRead %s",
curPos, maxOffsetToRead);
if (readaheadLength <= 0) {
return null;
}
long lastOffset = Long.MIN_VALUE;
if (lastReadahead != null) {
lastOffset = lastReadahead.getOffset();
}
// trigger each readahead when we have reached the halfway mark
// in the previous readahead. This gives the system time
// to satisfy the readahead before we start reading the data.
long nextOffset = lastOffset + readaheadLength / 2;
if (curPos >= nextOffset) {
// cancel any currently pending readahead, to avoid
// piling things up in the queue. Each reader should have at most
// one outstanding request in the queue.
if (lastReadahead != null) {
lastReadahead.cancel();
lastReadahead = null;
}
long length = Math.min(readaheadLength,
maxOffsetToRead - curPos);
if (length <= 0) {
// we've reached the end of the stream
return null;
}
return submitReadahead(identifier, fd, curPos, length);
} else {
return lastReadahead;
}
}
/**
* Submit a request to readahead on the given file descriptor.
* @param identifier a textual identifier used in error messages, etc.
* @param fd the file descriptor to readahead
* @param off the offset at which to start the readahead
* @param len the number of bytes to read
* @return an object representing this pending request
*/
public ReadaheadRequest submitReadahead(
String identifier, FileDescriptor fd, long off, long len) {
ReadaheadRequestImpl req = new ReadaheadRequestImpl(
identifier, fd, off, len);
pool.execute(req);
if (LOG.isTraceEnabled()) {
LOG.trace("submit readahead: " + req);
}
return req;
}
/**
* An outstanding readahead request that has been submitted to
* the pool. This request may be pending or may have been
* completed.
*/
public interface ReadaheadRequest {
/**
* Cancels the request for readahead. This should be used
* if the reader no longer needs the requested data, <em>before</em>
* closing the related file descriptor.
*
* It is safe to use even if the readahead request has already
* been fulfilled.
*/
public void cancel();
/**
* @return the requested offset
*/
public long getOffset();
/**
* @return the requested length
*/
public long getLength();
}
private static class ReadaheadRequestImpl implements Runnable, ReadaheadRequest {
private final String identifier;
private final FileDescriptor fd;
private final long off, len;
private volatile boolean canceled = false;
private ReadaheadRequestImpl(String identifier, FileDescriptor fd, long off, long len) {
this.identifier = identifier;
this.fd = fd;
this.off = off;
this.len = len;
}
public void run() {
if (canceled) return;
// There's a very narrow race here that the file will close right at
// this instant. But if that happens, we'll likely receive an EBADF
// error below, and see that it's canceled, ignoring the error.
// It's also possible that we'll end up requesting readahead on some
// other FD, which may be wasted work, but won't cause a problem.
try {
NativeIO.posixFadviseIfPossible(fd, off, len,
NativeIO.POSIX_FADV_WILLNEED);
} catch (IOException ioe) {
if (canceled) {
// no big deal - the reader canceled the request and closed
// the file.
return;
}
LOG.warn("Failed readahead on " + identifier,
ioe);
}
}
@Override
public void cancel() {
canceled = true;
// We could attempt to remove it from the work queue, but that would
// add complexity. In practice, the work queues remain very short,
// so removing canceled requests has no gain.
}
@Override
public long getOffset() {
return off;
}
@Override
public long getLength() {
return len;
}
@Override
public String toString() {
return "ReadaheadRequestImpl [identifier='" + identifier + "', fd=" + fd
+ ", off=" + off + ", len=" + len + "]";
}
}
}

View File

@ -46,10 +46,41 @@ public class NativeIO {
public static final int O_FSYNC = O_SYNC; public static final int O_FSYNC = O_SYNC;
public static final int O_NDELAY = O_NONBLOCK; public static final int O_NDELAY = O_NONBLOCK;
// Flags for posix_fadvise() from bits/fcntl.h
/* No further special treatment. */
public static final int POSIX_FADV_NORMAL = 0;
/* Expect random page references. */
public static final int POSIX_FADV_RANDOM = 1;
/* Expect sequential page references. */
public static final int POSIX_FADV_SEQUENTIAL = 2;
/* Will need these pages. */
public static final int POSIX_FADV_WILLNEED = 3;
/* Don't need these pages. */
public static final int POSIX_FADV_DONTNEED = 4;
/* Data will be accessed once. */
public static final int POSIX_FADV_NOREUSE = 5;
/* Wait upon writeout of all pages
in the range before performing the
write. */
public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
/* Initiate writeout of all those
dirty pages in the range which are
not presently under writeback. */
public static final int SYNC_FILE_RANGE_WRITE = 2;
/* Wait upon writeout of all pages in
the range after performing the
write. */
public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
private static final Log LOG = LogFactory.getLog(NativeIO.class); private static final Log LOG = LogFactory.getLog(NativeIO.class);
private static boolean nativeLoaded = false; private static boolean nativeLoaded = false;
private static boolean workaroundNonThreadSafePasswdCalls = false; private static boolean workaroundNonThreadSafePasswdCalls = false;
private static boolean fadvisePossible = true;
private static boolean syncFileRangePossible = true;
static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY = static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
"hadoop.workaround.non.threadsafe.getpwuid"; "hadoop.workaround.non.threadsafe.getpwuid";
@ -88,9 +119,58 @@ public static boolean isAvailable() {
/** Wrapper around chmod(2) */ /** Wrapper around chmod(2) */
public static native void chmod(String path, int mode) throws IOException; public static native void chmod(String path, int mode) throws IOException;
/** Wrapper around posix_fadvise(2) */
static native void posix_fadvise(
FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
/** Wrapper around sync_file_range(2) */
static native void sync_file_range(
FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
/** Initialize the JNI method ID and class ID cache */ /** Initialize the JNI method ID and class ID cache */
private static native void initNative(); private static native void initNative();
/**
* Call posix_fadvise on the given file descriptor. See the manpage
* for this syscall for more information. On systems where this
* call is not available, does nothing.
*
* @throws NativeIOException if there is an error with the syscall
*/
public static void posixFadviseIfPossible(
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
if (nativeLoaded && fadvisePossible) {
try {
posix_fadvise(fd, offset, len, flags);
} catch (UnsupportedOperationException uoe) {
fadvisePossible = false;
} catch (UnsatisfiedLinkError ule) {
fadvisePossible = false;
}
}
}
/**
* Call sync_file_range on the given file descriptor. See the manpage
* for this syscall for more information. On systems where this
* call is not available, does nothing.
*
* @throws NativeIOException if there is an error with the syscall
*/
public static void syncFileRangeIfPossible(
FileDescriptor fd, long offset, long nbytes, int flags)
throws NativeIOException {
if (nativeLoaded && syncFileRangePossible) {
try {
sync_file_range(fd, offset, nbytes, flags);
} catch (UnsupportedOperationException uoe) {
syncFileRangePossible = false;
} catch (UnsatisfiedLinkError ule) {
syncFileRangePossible = false;
}
}
}
/** /**
* Result type of the fstat call * Result type of the fstat call

View File

@ -40,6 +40,7 @@ AC_CONFIG_AUX_DIR([config])
AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_HEADER([config.h]) AC_CONFIG_HEADER([config.h])
AC_SYS_LARGEFILE AC_SYS_LARGEFILE
AC_GNU_SOURCE
AM_INIT_AUTOMAKE(hadoop,1.0.0) AM_INIT_AUTOMAKE(hadoop,1.0.0)
@ -57,10 +58,8 @@ if test $JAVA_HOME != ""
then then
JNI_LDFLAGS="-L$JAVA_HOME/jre/lib/$OS_ARCH/server" JNI_LDFLAGS="-L$JAVA_HOME/jre/lib/$OS_ARCH/server"
fi fi
ldflags_bak=$LDFLAGS
LDFLAGS="$LDFLAGS $JNI_LDFLAGS" LDFLAGS="$LDFLAGS $JNI_LDFLAGS"
AC_CHECK_LIB([jvm], [JNI_GetCreatedJavaVMs]) AC_CHECK_LIB([jvm], [JNI_GetCreatedJavaVMs])
LDFLAGS=$ldflags_bak
AC_SUBST([JNI_LDFLAGS]) AC_SUBST([JNI_LDFLAGS])
# Checks for header files. # Checks for header files.
@ -94,6 +93,12 @@ AC_CHECK_HEADERS([snappy-c.h], AC_COMPUTE_NEEDED_DSO(snappy,HADOOP_SNAPPY_LIBRAR
dnl Check for headers needed by the native Group resolution implementation dnl Check for headers needed by the native Group resolution implementation
AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.)) AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.))
dnl check for posix_fadvise
AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(posix_fadvise)])
dnl check for sync_file_range
AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(sync_file_range)])
# Checks for typedefs, structures, and compiler characteristics. # Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST AC_C_CONST

View File

@ -29,6 +29,7 @@
#include <string.h> #include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
#include "org_apache_hadoop.h" #include "org_apache_hadoop.h"
@ -234,6 +235,81 @@ cleanup:
} }
/**
* public static native void posix_fadvise(
* FileDescriptor fd, long offset, long len, int flags);
*/
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_posix_1fadvise(
JNIEnv *env, jclass clazz,
jobject fd_object, jlong offset, jlong len, jint flags)
{
#ifndef HAVE_POSIX_FADVISE
THROW(env, "java/lang/UnsupportedOperationException",
"fadvise support not available");
#else
int fd = fd_get(env, fd_object);
PASS_EXCEPTIONS(env);
int err = 0;
if ((err = posix_fadvise(fd, (off_t)offset, (off_t)len, flags))) {
throw_ioe(env, err);
}
#endif
}
#if defined(HAVE_SYNC_FILE_RANGE)
# define my_sync_file_range sync_file_range
#elif defined(SYS_sync_file_range)
// RHEL 5 kernels have sync_file_range support, but the glibc
// included does not have the library function. We can
// still call it directly, and if it's not supported by the
// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581
static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags)
{
#ifdef __x86_64__
return syscall( SYS_sync_file_range, fd, from, to, flags);
#else
return syscall (SYS_sync_file_range, fd,
__LONG_LONG_PAIR ((long) (from >> 32), (long) from),
__LONG_LONG_PAIR ((long) (to >> 32), (long) to),
flags);
#endif
}
#define my_sync_file_range manual_sync_file_range
#endif
/**
* public static native void sync_file_range(
* FileDescriptor fd, long offset, long len, int flags);
*/
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_sync_1file_1range(
JNIEnv *env, jclass clazz,
jobject fd_object, jlong offset, jlong len, jint flags)
{
#ifndef my_sync_file_range
THROW(env, "java/lang/UnsupportedOperationException",
"sync_file_range support not available");
#else
int fd = fd_get(env, fd_object);
PASS_EXCEPTIONS(env);
if (my_sync_file_range(fd, (off_t)offset, (off_t)len, flags)) {
if (errno == ENOSYS) {
// we know the syscall number, but it's not compiled
// into the running kernel
THROW(env, "java/lang/UnsupportedOperationException",
"sync_file_range kernel support not available");
return;
} else {
throw_ioe(env, errno);
}
}
#endif
}
/* /*
* public static native FileDescriptor open(String path, int flags, int mode); * public static native FileDescriptor open(String path, int flags, int mode);
*/ */

View File

@ -54,6 +54,11 @@ void fd_deinit(JNIEnv *env) {
* underlying fd, or throw if unavailable * underlying fd, or throw if unavailable
*/ */
int fd_get(JNIEnv* env, jobject obj) { int fd_get(JNIEnv* env, jobject obj) {
if (obj == NULL) {
THROW(env, "java/lang/NullPointerException",
"FileDescriptor object is null");
return -1;
}
return (*env)->GetIntField(env, obj, fd_descriptor); return (*env)->GetIntField(env, obj, fd_descriptor);
} }

View File

@ -19,6 +19,7 @@
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -210,6 +211,66 @@ public void testChmod() throws Exception {
assertPermissions(toChmod, 0644); assertPermissions(toChmod, 0644);
} }
@Test
public void testPosixFadvise() throws Exception {
FileInputStream fis = new FileInputStream("/dev/zero");
try {
NativeIO.posix_fadvise(fis.getFD(), 0, 0,
NativeIO.POSIX_FADV_SEQUENTIAL);
} catch (UnsupportedOperationException uoe) {
// we should just skip the unit test on machines where we don't
// have fadvise support
assumeTrue(false);
} finally {
fis.close();
}
try {
NativeIO.posix_fadvise(fis.getFD(), 0, 1024,
NativeIO.POSIX_FADV_SEQUENTIAL);
fail("Did not throw on bad file");
} catch (NativeIOException nioe) {
assertEquals(Errno.EBADF, nioe.getErrno());
}
try {
NativeIO.posix_fadvise(null, 0, 1024,
NativeIO.POSIX_FADV_SEQUENTIAL);
fail("Did not throw on null file");
} catch (NullPointerException npe) {
// expected
}
}
@Test
public void testSyncFileRange() throws Exception {
FileOutputStream fos = new FileOutputStream(
new File(TEST_DIR, "testSyncFileRange"));
try {
fos.write("foo".getBytes());
NativeIO.sync_file_range(fos.getFD(), 0, 1024,
NativeIO.SYNC_FILE_RANGE_WRITE);
// no way to verify that this actually has synced,
// but if it doesn't throw, we can assume it worked
} catch (UnsupportedOperationException uoe) {
// we should just skip the unit test on machines where we don't
// have fadvise support
assumeTrue(false);
} finally {
fos.close();
}
try {
NativeIO.sync_file_range(fos.getFD(), 0, 1024,
NativeIO.SYNC_FILE_RANGE_WRITE);
fail("Did not throw on bad file");
} catch (NativeIOException nioe) {
assertEquals(Errno.EBADF, nioe.getErrno());
}
}
private void assertPermissions(File f, int expected) throws IOException { private void assertPermissions(File f, int expected) throws IOException {
FileSystem localfs = FileSystem.getLocal(new Configuration()); FileSystem localfs = FileSystem.getLocal(new Configuration());
FsPermission perms = localfs.getFileStatus( FsPermission perms = localfs.getFileStatus(